infrahub_sdk.client
Functions
handle_relogin
handle_relogin(func: Callable[..., Coroutine[Any, Any, httpx.Response]]) -> Callable[..., Coroutine[Any, Any, httpx.Response]]
handle_relogin_sync
handle_relogin_sync(func: Callable[..., httpx.Response]) -> Callable[..., httpx.Response]
raise_for_error_deprecation_warning
raise_for_error_deprecation_warning(value: bool | None) -> None
Classes
ProcessRelationsNode
ProxyConfig
ProxyConfigSync
ProcessRelationsNodeSync
BaseClient
Base class for InfrahubClient and InfrahubClientSync
Methods:
request_context
request_context(self) -> RequestContext | None
request_context
request_context(self, request_context: RequestContext) -> None
start_tracking
start_tracking(self, identifier: str | None = None, params: dict[str, Any] | None = None, delete_unused_nodes: bool = False, group_type: str | None = None, group_params: dict[str, Any] | None = None, branch: str | None = None) -> Self
set_context_properties
set_context_properties(self, identifier: str, params: dict[str, str] | None = None, delete_unused_nodes: bool = True, reset: bool = True, group_type: str | None = None, group_params: dict[str, Any] | None = None, branch: str | None = None) -> None
InfrahubClient
GraphQL Client to interact with Infrahub.
Methods:
get_version
get_version(self) -> str
Return the Infrahub version.
get_user
get_user(self) -> dict
Return user information
get_user_permissions
get_user_permissions(self) -> dict
Return user permissions
create
create(self, kind: str, data: dict | None = ..., branch: str | None = ..., **kwargs: Any) -> InfrahubNode
create
create(self, kind: type[SchemaType], data: dict | None = ..., branch: str | None = ..., **kwargs: Any) -> SchemaType
create
create(self, kind: str | type[SchemaType], data: dict | None = None, branch: str | None = None, timeout: int | None = None, **kwargs: Any) -> InfrahubNode | SchemaType
delete
delete(self, kind: str | type[SchemaType], id: str, branch: str | None = None) -> None
get
get(self, kind: type[SchemaType], raise_when_missing: Literal[False], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> SchemaType | None
get
get(self, kind: type[SchemaType], raise_when_missing: Literal[True], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> SchemaType
get
get(self, kind: type[SchemaType], raise_when_missing: bool = ..., at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> SchemaType
get
get(self, kind: str, raise_when_missing: Literal[False], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> InfrahubNode | None
get
get(self, kind: str, raise_when_missing: Literal[True], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> InfrahubNode
get
get(self, kind: str, raise_when_missing: bool = ..., at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> InfrahubNode
get
get(self, kind: str | type[SchemaType], raise_when_missing: bool = True, at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, id: str | None = None, hfid: list[str] | None = None, include: list[str] | None = None, exclude: list[str] | None = None, populate_store: bool = True, fragment: bool = False, prefetch_relationships: bool = False, property: bool = False, include_metadata: bool = False, **kwargs: Any) -> InfrahubNode | SchemaType | None
count
count(self, kind: str | type[SchemaType], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, partial_match: bool = False, **kwargs: Any) -> int
Return the number of nodes of a given kind.
all
all(self, kind: type[SchemaType], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ...) -> list[SchemaType]
all
all(self, kind: str, at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ...) -> list[InfrahubNode]
all
all(self, kind: str | type[SchemaType], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, populate_store: bool = True, offset: int | None = None, limit: int | None = None, include: list[str] | None = None, exclude: list[str] | None = None, fragment: bool = False, prefetch_relationships: bool = False, property: bool = False, parallel: bool = False, order: Order | None = None, include_metadata: bool = False) -> list[InfrahubNode] | list[SchemaType]
Retrieve all nodes of a given kind
Args:
kind: kind of the nodes to queryat: Time of the query. Defaults to Now.branch: Name of the branch to query from. Defaults to default_branch.populate_store: Flag to indicate whether to populate the store with the retrieved nodes.timeout: Overrides default timeout used when querying the GraphQL API. Specified in seconds.offset: The offset for pagination.limit: The limit for pagination.include: List of attributes or relationships to include in the query.exclude: List of attributes or relationships to exclude from the query.fragment: Flag to use GraphQL fragments for generic schemas.prefetch_relationships: Flag to indicate whether to prefetch related node data.parallel: Whether to use parallel processing for the query.order: Ordering related options. Settingdisable=Trueenhances performances.include_metadata: If True, includes node_metadata and relationship_metadata in the query.
Returns:
- list[InfrahubNode]: List of Nodes
filters
filters(self, kind: type[SchemaType], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., partial_match: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., **kwargs: Any) -> list[SchemaType]
filters
filters(self, kind: str, at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., partial_match: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., **kwargs: Any) -> list[InfrahubNode]
filters
filters(self, kind: str | type[SchemaType], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, populate_store: bool = True, offset: int | None = None, limit: int | None = None, include: list[str] | None = None, exclude: list[str] | None = None, fragment: bool = False, prefetch_relationships: bool = False, partial_match: bool = False, property: bool = False, parallel: bool = False, order: Order | None = None, include_metadata: bool = False, **kwargs: Any) -> list[InfrahubNode] | list[SchemaType]
Retrieve nodes of a given kind based on provided filters.
Args:
kind: kind of the nodes to queryat: Time of the query. Defaults to Now.branch: Name of the branch to query from. Defaults to default_branch.timeout: Overrides default timeout used when querying the GraphQL API. Specified in seconds.populate_store: Flag to indicate whether to populate the store with the retrieved nodes.offset: The offset for pagination.limit: The limit for pagination.include: List of attributes or relationships to include in the query.exclude: List of attributes or relationships to exclude from the query.fragment: Flag to use GraphQL fragments for generic schemas.prefetch_relationships: Flag to indicate whether to prefetch related node data.partial_match: Allow partial match of filter criteria for the query.parallel: Whether to use parallel processing for the query.order: Ordering related options. Settingdisable=Trueenhances performances.include_metadata: If True, includes node_metadata and relationship_metadata in the query.**kwargs: Additional filter criteria for the query.
Returns:
- list[InfrahubNodeSync]: List of Nodes that match the given filters.
clone
clone(self, branch: str | None = None) -> InfrahubClient
Return a cloned version of the client using the same configuration
execute_graphql
execute_graphql(self, query: str, variables: dict | None = None, branch_name: str | None = None, at: str | Timestamp | None = None, timeout: int | None = None, raise_for_error: bool | None = None, tracker: str | None = None) -> dict
Execute a GraphQL query (or mutation). If retry_on_failure is True, the query will retry until the server becomes reacheable.
Args:
query: GraphQL Query to execute, can be a query or a mutationvariables: Variables to pass along with the GraphQL query. Defaults to None.branch_name: Name of the branch on which the query will be executed. Defaults to None.at: Time when the query should be executed. Defaults to None.timeout: Timeout in second for the query. Defaults to None.raise_for_error: Deprecated. Controls only HTTP status handling.- None (default) or True: HTTP errors raise via resp.raise_for_status().
- False: HTTP errors are not automatically raised. Defaults to None.
Raises:
GraphQLError: When the GraphQL response contains errors.
Returns:
- The GraphQL data payload (response["data"]).
refresh_login
refresh_login(self) -> None
login
login(self, refresh: bool = False) -> None
query_gql_query
query_gql_query(self, name: str, variables: dict | None = None, update_group: bool = False, subscribers: list[str] | None = None, params: dict | None = None, branch_name: str | None = None, at: str | None = None, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool | None = None) -> dict
create_diff
create_diff(self, branch: str, name: str, from_time: datetime, to_time: datetime, wait_until_completion: bool = True) -> bool | str
get_diff_summary
get_diff_summary(self, branch: str, name: str | None = None, from_time: datetime | None = None, to_time: datetime | None = None, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool | None = None) -> list[NodeDiff]
get_diff_tree
get_diff_tree(self, branch: str, name: str | None = None, from_time: datetime | None = None, to_time: datetime | None = None, timeout: int | None = None, tracker: str | None = None) -> DiffTreeData | None
Get complete diff tree with metadata and nodes.
Returns None if no diff exists.
allocate_next_ip_address
allocate_next_ip_address(self, resource_pool: CoreNode, kind: type[SchemaType], identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> SchemaType
allocate_next_ip_address
allocate_next_ip_address(self, resource_pool: CoreNode, kind: type[SchemaType], identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> SchemaType | None
allocate_next_ip_address
allocate_next_ip_address(self, resource_pool: CoreNode, kind: type[SchemaType], identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool | None = ...) -> SchemaType
allocate_next_ip_address
allocate_next_ip_address(self, resource_pool: CoreNode, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> CoreNode
allocate_next_ip_address
allocate_next_ip_address(self, resource_pool: CoreNode, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> CoreNode | None
allocate_next_ip_address
allocate_next_ip_address(self, resource_pool: CoreNode, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool | None = ...) -> CoreNode | None
allocate_next_ip_address
allocate_next_ip_address(self, resource_pool: CoreNode, kind: type[SchemaType] | None = None, identifier: str | None = None, prefix_length: int | None = None, address_type: str | None = None, data: dict[str, Any] | None = None, branch: str | None = None, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool | None = None) -> CoreNode | SchemaType | None
Allocate a new IP address by using the provided resource pool.
Args:
resource_pool: Node corresponding to the pool to allocate resources from.identifier: Value to perform idempotent allocation, the same resource will be returned for a given identifier.prefix_length: Length of the prefix to set on the address to allocate.address_type: Kind of the address to allocate.data: A key/value map to use to set attributes values on the allocated address.branch: Name of the branch to allocate from. Defaults to default_branch.timeout: Flag to indicate whether to populate the store with the retrieved nodes.tracker: The offset for pagination.raise_for_error: Deprecated, raise an error if the HTTP status is not 2XX.
Returns: InfrahubNode: Node corresponding to the allocated resource.
allocate_next_ip_prefix
allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: type[SchemaType], identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> SchemaType
allocate_next_ip_prefix
allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: type[SchemaType], identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> SchemaType | None
allocate_next_ip_prefix
allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: type[SchemaType], identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool | None = ...) -> SchemaType
allocate_next_ip_prefix
allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> CoreNode
allocate_next_ip_prefix
allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> CoreNode | None
allocate_next_ip_prefix
allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool | None = ...) -> CoreNode | None
allocate_next_ip_prefix
allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: type[SchemaType] | None = None, identifier: str | None = None, prefix_length: int | None = None, member_type: str | None = None, prefix_type: str | None = None, data: dict[str, Any] | None = None, branch: str | None = None, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool | None = None) -> CoreNode | SchemaType | None
Allocate a new IP prefix by using the provided resource pool.
Args:
resource_pool: Node corresponding to the pool to allocate resources from.identifier: Value to perform idempotent allocation, the same resource will be returned for a given identifier.prefix_length: Length of the prefix to allocate.member_type: Member type of the prefix to allocate.prefix_type: Kind of the prefix to allocate.data: A key/value map to use to set attributes values on the allocated prefix.branch: Name of the branch to allocate from. Defaults to default_branch.timeout: Flag to indicate whether to populate the store with the retrieved nodes.tracker: The offset for pagination.raise_for_error: Deprecated, raise an error if the HTTP status is not 2XX.
Returns: InfrahubNode: Node corresponding to the allocated resource.
create_batch
create_batch(self, return_exceptions: bool = False) -> InfrahubBatch
get_list_repositories
get_list_repositories(self, branches: dict[str, BranchData] | None = None, kind: str = 'CoreGenericRepository') -> dict[str, RepositoryData]
repository_update_commit
repository_update_commit(self, branch_name: str, repository_id: str, commit: str, is_read_only: bool = False) -> bool
convert_object_type
convert_object_type(self, node_id: str, target_kind: str, branch: str | None = None, fields_mapping: dict[str, ConversionFieldInput] | None = None) -> InfrahubNode
Convert a given node to another kind on a given branch. fields_mapping keys are target fields names
and its values indicate how to fill in these fields. Any mandatory field not having an equivalent field
in the source kind should be specified in this mapping. See https://docs.infrahub.app/guides/object-convert-type
for more information.
InfrahubClientSync
Methods:
get_version
get_version(self) -> str
Return the Infrahub version.
get_user
get_user(self) -> dict
Return user information
get_user_permissions
get_user_permissions(self) -> dict
Return user permissions
create
create(self, kind: str, data: dict | None = ..., branch: str | None = ..., **kwargs: Any) -> InfrahubNodeSync
create
create(self, kind: type[SchemaTypeSync], data: dict | None = ..., branch: str | None = ..., **kwargs: Any) -> SchemaTypeSync
create
create(self, kind: str | type[SchemaTypeSync], data: dict | None = None, branch: str | None = None, timeout: int | None = None, **kwargs: Any) -> InfrahubNodeSync | SchemaTypeSync
delete
delete(self, kind: str | type[SchemaTypeSync], id: str, branch: str | None = None) -> None
clone
clone(self, branch: str | None = None) -> InfrahubClientSync
Return a cloned version of the client using the same configuration
execute_graphql
execute_graphql(self, query: str, variables: dict | None = None, branch_name: str | None = None, at: str | Timestamp | None = None, timeout: int | None = None, raise_for_error: bool | None = None, tracker: str | None = None) -> dict
Execute a GraphQL query (or mutation). If retry_on_failure is True, the query will retry until the server becomes reacheable.
Args:
query: GraphQL Query to execute, can be a query or a mutationvariables: Variables to pass along with the GraphQL query. Defaults to None.branch_name: Name of the branch on which the query will be executed. Defaults to None.at: Time when the query should be executed. Defaults to None.timeout: Timeout in second for the query. Defaults to None.raise_for_error: Deprecated. Controls only HTTP status handling.- None (default) or True: HTTP errors raise via
resp.raise_for_status(). - False: HTTP errors are not automatically raised.
GraphQL errors always raise
GraphQLError. Defaults to None.
Raises:
GraphQLError: When the GraphQL response contains errors.
Returns:
- The GraphQL data payload (
response["data"]).
count
count(self, kind: str | type[SchemaType], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, partial_match: bool = False, **kwargs: Any) -> int
Return the number of nodes of a given kind.
all
all(self, kind: type[SchemaTypeSync], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ...) -> list[SchemaTypeSync]
all
all(self, kind: str, at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ...) -> list[InfrahubNodeSync]
all
all(self, kind: str | type[SchemaTypeSync], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, populate_store: bool = True, offset: int | None = None, limit: int | None = None, include: list[str] | None = None, exclude: list[str] | None = None, fragment: bool = False, prefetch_relationships: bool = False, property: bool = False, parallel: bool = False, order: Order | None = None, include_metadata: bool = False) -> list[InfrahubNodeSync] | list[SchemaTypeSync]
Retrieve all nodes of a given kind
Args:
kind: kind of the nodes to queryat: Time of the query. Defaults to Now.branch: Name of the branch to query from. Defaults to default_branch.timeout: Overrides default timeout used when querying the GraphQL API. Specified in seconds.populate_store: Flag to indicate whether to populate the store with the retrieved nodes.offset: The offset for pagination.limit: The limit for pagination.include: List of attributes or relationships to include in the query.exclude: List of attributes or relationships to exclude from the query.fragment: Flag to use GraphQL fragments for generic schemas.prefetch_relationships: Flag to indicate whether to prefetch related node data.parallel: Whether to use parallel processing for the query.order: Ordering related options. Settingdisable=Trueenhances performances.include_metadata: If True, includes node_metadata and relationship_metadata in the query.
Returns:
- list[InfrahubNodeSync]: List of Nodes
filters
filters(self, kind: type[SchemaTypeSync], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., partial_match: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., **kwargs: Any) -> list[SchemaTypeSync]
filters
filters(self, kind: str, at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., partial_match: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., **kwargs: Any) -> list[InfrahubNodeSync]
filters
filters(self, kind: str | type[SchemaTypeSync], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, populate_store: bool = True, offset: int | None = None, limit: int | None = None, include: list[str] | None = None, exclude: list[str] | None = None, fragment: bool = False, prefetch_relationships: bool = False, partial_match: bool = False, property: bool = False, parallel: bool = False, order: Order | None = None, include_metadata: bool = False, **kwargs: Any) -> list[InfrahubNodeSync] | list[SchemaTypeSync]
Retrieve nodes of a given kind based on provided filters.
Args:
kind: kind of the nodes to queryat: Time of the query. Defaults to Now.branch: Name of the branch to query from. Defaults to default_branch.timeout: Overrides default timeout used when querying the GraphQL API. Specified in seconds.populate_store: Flag to indicate whether to populate the store with the retrieved nodes.offset: The offset for pagination.limit: The limit for pagination.include: List of attributes or relationships to include in the query.exclude: List of attributes or relationships to exclude from the query.fragment: Flag to use GraphQL fragments for generic schemas.prefetch_relationships: Flag to indicate whether to prefetch related node data.partial_match: Allow partial match of filter criteria for the query.parallel: Whether to use parallel processing for the query.order: Ordering related options. Settingdisable=Trueenhances performances.include_metadata: If True, includes node_metadata and relationship_metadata in the query.**kwargs: Additional filter criteria for the query.
Returns:
- list[InfrahubNodeSync]: List of Nodes that match the given filters.
get
get(self, kind: type[SchemaTypeSync], raise_when_missing: Literal[False], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> SchemaTypeSync | None
get
get(self, kind: type[SchemaTypeSync], raise_when_missing: Literal[True], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> SchemaTypeSync
get
get(self, kind: type[SchemaTypeSync], raise_when_missing: bool = ..., at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> SchemaTypeSync
get
get(self, kind: str, raise_when_missing: Literal[False], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> InfrahubNodeSync | None
get
get(self, kind: str, raise_when_missing: Literal[True], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> InfrahubNodeSync
get
get(self, kind: str, raise_when_missing: bool = ..., at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., **kwargs: Any) -> InfrahubNodeSync
get
get(self, kind: str | type[SchemaTypeSync], raise_when_missing: bool = True, at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, id: str | None = None, hfid: list[str] | None = None, include: list[str] | None = None, exclude: list[str] | None = None, populate_store: bool = True, fragment: bool = False, prefetch_relationships: bool = False, property: bool = False, include_metadata: bool = False, **kwargs: Any) -> InfrahubNodeSync | SchemaTypeSync | None
create_batch
create_batch(self, return_exceptions: bool = False) -> InfrahubBatchSync
Create a batch to execute multiple queries concurrently.
Executing the batch will be performed using a thread pool, meaning it cannot guarantee the execution order. It is not recommended to use such batch to manipulate objects that depend on each others.
get_list_repositories
get_list_repositories(self, branches: dict[str, BranchData] | None = None, kind: str = 'CoreGenericRepository') -> dict[str, RepositoryData]
query_gql_query
query_gql_query(self, name: str, variables: dict | None = None, update_group: bool = False, subscribers: list[str] | None = None, params: dict | None = None, branch_name: str | None = None, at: str | None = None, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool | None = None) -> dict
create_diff
create_diff(self, branch: str, name: str, from_time: datetime, to_time: datetime, wait_until_completion: bool = True) -> bool | str
get_diff_summary
get_diff_summary(self, branch: str, name: str | None = None, from_time: datetime | None = None, to_time: datetime | None = None, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool | None = None) -> list[NodeDiff]
get_diff_tree
get_diff_tree(self, branch: str, name: str | None = None, from_time: datetime | None = None, to_time: datetime | None = None, timeout: int | None = None, tracker: str | None = None) -> DiffTreeData | None
Get complete diff tree with metadata and nodes.
Returns None if no diff exists.
allocate_next_ip_address
allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync], identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> SchemaTypeSync
allocate_next_ip_address
allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync], identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> SchemaTypeSync | None
allocate_next_ip_address
allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync], identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool | None = ...) -> SchemaTypeSync
allocate_next_ip_address
allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> CoreNodeSync
allocate_next_ip_address
allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> CoreNodeSync | None
allocate_next_ip_address
allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool | None = ...) -> CoreNodeSync | None
allocate_next_ip_address
allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync] | None = None, identifier: str | None = None, prefix_length: int | None = None, address_type: str | None = None, data: dict[str, Any] | None = None, branch: str | None = None, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool | None = None) -> CoreNodeSync | SchemaTypeSync | None
Allocate a new IP address by using the provided resource pool.
Args:
resource_pool: Node corresponding to the pool to allocate resources from.identifier: Value to perform idempotent allocation, the same resource will be returned for a given identifier.prefix_length: Length of the prefix to set on the address to allocate.address_type: Kind of the address to allocate.data: A key/value map to use to set attributes values on the allocated address.branch: Name of the branch to allocate from. Defaults to default_branch.timeout: Flag to indicate whether to populate the store with the retrieved nodes.tracker: The offset for pagination.raise_for_error: The limit for pagination.
Returns: InfrahubNodeSync: Node corresponding to the allocated resource.
allocate_next_ip_prefix
allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync], identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> SchemaTypeSync
allocate_next_ip_prefix
allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync], identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> SchemaTypeSync | None
allocate_next_ip_prefix
allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync], identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool | None = ...) -> SchemaTypeSync
allocate_next_ip_prefix
allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[True] = True) -> CoreNodeSync
allocate_next_ip_prefix
allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: Literal[False] = False) -> CoreNodeSync | None
allocate_next_ip_prefix
allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ..., raise_for_error: bool | None = ...) -> CoreNodeSync | None
allocate_next_ip_prefix
allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync] | None = None, identifier: str | None = None, prefix_length: int | None = None, member_type: str | None = None, prefix_type: str | None = None, data: dict[str, Any] | None = None, branch: str | None = None, timeout: int | None = None, tracker: str | None = None, raise_for_error: bool | None = None) -> CoreNodeSync | SchemaTypeSync | None
Allocate a new IP prefix by using the provided resource pool.
Args:
resource_pool: Node corresponding to the pool to allocate resources from.identifier: Value to perform idempotent allocation, the same resource will be returned for a given identifier.size: Length of the prefix to allocate.member_type: Member type of the prefix to allocate.prefix_type: Kind of the prefix to allocate.data: A key/value map to use to set attributes values on the allocated prefix.branch: Name of the branch to allocate from. Defaults to default_branch.timeout: Flag to indicate whether to populate the store with the retrieved nodes.tracker: The offset for pagination.raise_for_error: The limit for pagination.
Returns: InfrahubNodeSync: Node corresponding to the allocated resource.
repository_update_commit
repository_update_commit(self, branch_name: str, repository_id: str, commit: str, is_read_only: bool = False) -> bool
refresh_login
refresh_login(self) -> None
login
login(self, refresh: bool = False) -> None
convert_object_type
convert_object_type(self, node_id: str, target_kind: str, branch: str | None = None, fields_mapping: dict[str, ConversionFieldInput] | None = None) -> InfrahubNodeSync
Convert a given node to another kind on a given branch. fields_mapping keys are target fields names
and its values indicate how to fill in these fields. Any mandatory field not having an equivalent field
in the source kind should be specified in this mapping. See https://docs.infrahub.app/guides/object-convert-type
for more information.