Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ members = [
]

[workspace.package]
version = "0.34.0"
version = "0.35.0"


[workspace.lints.clippy]
Expand Down
45 changes: 44 additions & 1 deletion libs/opsqueue_python/python/opsqueue/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## Expected errors:

from . import opsqueue_internal
from typing import Optional


class SubmissionFailedError(Exception):
Expand Down Expand Up @@ -38,6 +39,35 @@ def __repr__(self) -> str:
return str(self)


class SubmissionNotCancellableError(Exception):
__slots__ = ["submission", "chunk"]
"""Raised when a submission could not be cancelled due to already being
completed, failed or cancelled.

"""

def __init__(
self,
submission: opsqueue_internal.SubmissionNotCancellable,
chunk: Optional[opsqueue_internal.ChunkFailed] = None,
):
super().__init__()
self.submission = submission
self.chunk = chunk

def __str__(self) -> str:
chunk_str = f"\n{self.chunk}"
return f"""
Submission {self.submission.submission.id} was not cancelled because:

{self.submission}
{"" if self.chunk is None else chunk_str}
"""

def __repr__(self) -> str:
return str(self)


## Usage errors:


Expand Down Expand Up @@ -76,7 +106,20 @@ class SubmissionNotFoundError(IncorrectUsageError):
but the submission doesn't exist within the Opsqueue.
"""

pass
__slots__ = ["submission_id"]

def __init__(
self,
submission_id: int,
):
super().__init__()
self.submission_id = submission_id

def __str__(self) -> str:
return f"Submission {self.submission_id} could not be found"

def __repr__(self) -> str:
return str(self)


class ChunkCountIsZeroError(IncorrectUsageError):
Expand Down
21 changes: 20 additions & 1 deletion libs/opsqueue_python/python/opsqueue/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
)
from . import opsqueue_internal
from . import tracing
from opsqueue.exceptions import SubmissionFailedError
from opsqueue.exceptions import (
SubmissionFailedError,
SubmissionNotCancellableError,
SubmissionNotFoundError,
)
from .opsqueue_internal import ( # type: ignore[import-not-found]
SubmissionId,
SubmissionStatus,
Expand All @@ -30,6 +34,8 @@
"SubmissionCompleted",
"SubmissionFailedError",
"SubmissionFailed",
"SubmissionNotCancellableError",
"SubmissionNotFoundError",
"ChunkFailed",
]

Expand Down Expand Up @@ -314,6 +320,19 @@ def count_submissions(self) -> int:
"""
return self.inner.count_submissions() # type: ignore[no-any-return]

def cancel_submission(self, submission_id: SubmissionId) -> None:
"""
Cancel a specific submission that is in progress.

Returns None if the submission was successfully cancelled.

Raises:
- `SubmissionNotCancellableError` if the submission could not be
cancelled because it was already completed, failed or cancelled.
- `SubmissionNotFoundError` if the submission could not be found.
"""
self.inner.cancel_submission(submission_id)

def get_submission_status(
self, submission_id: SubmissionId
) -> SubmissionStatus | None:
Expand Down
97 changes: 90 additions & 7 deletions libs/opsqueue_python/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,19 @@ impl From<opsqueue::common::submission::SubmissionFailed> for SubmissionFailed {
}
}

impl From<opsqueue::common::submission::SubmissionCancelled> for SubmissionCancelled {
fn from(value: opsqueue::common::submission::SubmissionCancelled) -> Self {
Self {
id: value.id.into(),
chunks_total: value.chunks_total.into(),
chunks_done: value.chunks_done.into(),
metadata: value.metadata,
strategic_metadata: value.strategic_metadata,
cancelled_at: value.cancelled_at,
}
}
}

#[pyclass(frozen, module = "opsqueue")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SubmissionStatus {
Expand All @@ -331,6 +344,9 @@ pub enum SubmissionStatus {
submission: SubmissionFailed,
chunk: ChunkFailed,
},
Cancelled {
submission: SubmissionCancelled,
},
}

impl From<opsqueue::common::submission::SubmissionStatus> for SubmissionStatus {
Expand All @@ -348,10 +364,20 @@ impl From<opsqueue::common::submission::SubmissionStatus> for SubmissionStatus {
let submission = s.into();
SubmissionStatus::Failed { submission, chunk }
}
Cancelled(s) => SubmissionStatus::Cancelled {
submission: s.into(),
},
}
}
}

#[pymethods]
impl SubmissionStatus {
fn __repr__(&self) -> String {
format!("{self:?}")
}
}

#[pyclass(frozen, get_all, module = "opsqueue")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Submission {
Expand Down Expand Up @@ -385,13 +411,6 @@ impl Submission {
}
}

#[pymethods]
impl SubmissionStatus {
fn __repr__(&self) -> String {
format!("{self:?}")
}
}

#[pymethods]
impl SubmissionCompleted {
fn __repr__(&self) -> String {
Expand All @@ -414,6 +433,14 @@ impl SubmissionFailed {
}
}

#[pymethods]
impl SubmissionCancelled {
fn __repr__(&self) -> String {
format!("SubmissionCancelled(id={0}, chunks_total={1}, chunks_done={2}, metadata={3:?}, strategic_metadata={4:?}, cancelled_at={5})",
self.id.__repr__(), self.chunks_total, self.chunks_done, self.metadata, self.strategic_metadata, self.cancelled_at)
}
}

#[pyclass(frozen, get_all, module = "opsqueue")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubmissionCompleted {
Expand All @@ -435,6 +462,62 @@ pub struct SubmissionFailed {
pub failed_chunk_id: u64,
}

#[pyclass(frozen, get_all, module = "opsqueue")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubmissionCancelled {
pub id: SubmissionId,
pub chunks_total: u64,
pub chunks_done: u64,
pub metadata: Option<submission::Metadata>,
pub strategic_metadata: Option<StrategicMetadataMap>,
pub cancelled_at: DateTime<Utc>,
}

/// Submission could not be cancelled because it was already completed, failed
/// or cancelled.
#[pyclass(frozen, module = "opsqueue")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SubmissionNotCancellable {
Completed {
submission: SubmissionCompleted,
},
Failed {
submission: SubmissionFailed,
chunk: ChunkFailed,
},
Cancelled {
submission: SubmissionCancelled,
},
}

impl From<opsqueue::common::errors::SubmissionNotCancellable> for SubmissionNotCancellable {
fn from(value: opsqueue::common::errors::SubmissionNotCancellable) -> Self {
use opsqueue::common::errors::SubmissionNotCancellable::*;
match value {
Completed(s) => SubmissionNotCancellable::Completed {
submission: s.into(),
},
Failed(s, c) => {
let chunk = ChunkFailed::from_internal(c, &s);
SubmissionNotCancellable::Failed {
submission: s.into(),
chunk,
}
}
Cancelled(s) => SubmissionNotCancellable::Cancelled {
submission: s.into(),
},
}
}
}

#[pymethods]
impl SubmissionNotCancellable {
fn __repr__(&self) -> String {
format!("{self:?}")
}
}

pub async fn run_unless_interrupted<T, E>(
future: impl IntoFuture<Output = Result<T, E>>,
) -> Result<T, E>
Expand Down
22 changes: 19 additions & 3 deletions libs/opsqueue_python/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
/// NOTE: We defne the potentially raisable errors/exceptions in Python
/// NOTE: We define the potentially raisable errors/exceptions in Python
/// so we have nice IDE support for docs-on-hover and for 'go to definition'.
use std::error::Error;

use opsqueue::common::chunk::ChunkId;
use opsqueue::common::errors::{
ChunkNotFound, IncorrectUsage, SubmissionNotFound, UnexpectedOpsqueueConsumerServerResponse, E,
ChunkNotFound, IncorrectUsage, SubmissionNotCancellable, SubmissionNotFound,
UnexpectedOpsqueueConsumerServerResponse, E,
};
use pyo3::exceptions::PyBaseException;
use pyo3::{import_exception, Bound, PyErr, Python};

use crate::common;
use crate::common::{ChunkIndex, SubmissionId};

// Expected errors:
Expand All @@ -19,6 +21,7 @@ import_exception!(opsqueue.exceptions, IncorrectUsageError);
import_exception!(opsqueue.exceptions, TryFromIntError);
import_exception!(opsqueue.exceptions, ChunkNotFoundError);
import_exception!(opsqueue.exceptions, SubmissionNotFoundError);
import_exception!(opsqueue.exceptions, SubmissionNotCancellableError);
import_exception!(opsqueue.exceptions, NewObjectStoreClientError);
import_exception!(opsqueue.exceptions, SubmissionNotCompletedYetError);

Expand Down Expand Up @@ -123,10 +126,23 @@ impl<T: Error> From<CError<IncorrectUsage<T>>> for PyErr {
}
}

impl From<CError<SubmissionNotCancellable>> for PyErr {
fn from(value: CError<SubmissionNotCancellable>) -> Self {
let c: Option<common::ChunkFailed> = match &value.0 {
opsqueue::common::errors::SubmissionNotCancellable::Failed(submission, chunk) => Some(
common::ChunkFailed::from_internal(chunk.clone(), submission),
),
_ => None,
};
let s: common::SubmissionNotCancellable = value.0.into();
SubmissionNotCancellableError::new_err((s, c))
}
}

impl From<CError<SubmissionNotFound>> for PyErr {
fn from(value: CError<SubmissionNotFound>) -> Self {
let submission_id = value.0 .0;
SubmissionNotFoundError::new_err((value.0.to_string(), SubmissionId::from(submission_id)))
SubmissionNotFoundError::new_err(u64::from(submission_id))
}
}

Expand Down
2 changes: 2 additions & 0 deletions libs/opsqueue_python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ fn opsqueue_internal(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<common::Submission>()?;
m.add_class::<common::SubmissionCompleted>()?;
m.add_class::<common::SubmissionFailed>()?;
m.add_class::<common::SubmissionCancelled>()?;
m.add_class::<common::SubmissionNotCancellable>()?;
m.add_class::<producer::PyChunksIter>()?;
m.add_class::<consumer::ConsumerClient>()?;
m.add_class::<producer::ProducerClient>()?;
Expand Down
29 changes: 29 additions & 0 deletions libs/opsqueue_python/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use pyo3::{
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use opsqueue::{
common::errors::E::{self, L, R},
common::errors::{SubmissionNotCancellable, SubmissionNotFound},
object_store::{ChunksStorageError, NewObjectStoreClientError},
producer::client::{Client as ActualClient, InternalProducerClientError},
};
Expand Down Expand Up @@ -118,6 +119,34 @@ impl ProducerClient {
})
}

/// Cancel a submission.
///
/// Will return an error if the submission is already complete, failed, or
/// cancelled, or if the submission could not be found.
pub fn cancel_submission(
&self,
py: Python<'_>,
id: SubmissionId,
) -> CPyResult<
(),
E<
FatalPythonException,
E<SubmissionNotFound, E<SubmissionNotCancellable, InternalProducerClientError>>,
>,
> {
py.allow_threads(|| {
self.block_unless_interrupted(async {
self.producer_client
.cancel_submission(id.into())
.await
.map_err(|e| CError(R(e)))
})
// TODO ?
// .map(|opt| opt.map(Into::into))
// .map_err(|e| ProducerClientError::new_err(e.to_string()))
})
}

/// Retrieve the status (in progress, completed or failed) of a specific submission.
///
/// The returned SubmissionStatus object also includes the number of chunks finished so far,
Expand Down
Loading