Skip to content

Commit

Permalink
doc(python): add python tabs in getters and step jobs pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Jul 7, 2023
1 parent b0adbf8 commit f1ca525
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 0 deletions.
4 changes: 4 additions & 0 deletions docs/gitbook/bull-3.x-migration/compatibility-class.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,7 @@ queue.add("cat", { sound: "meow" });
queue.add("cow", { sound: "moo" });
queue.add("dog", { sound: "bark" });
```

## Read more:

- 💡 [Queue3 API Reference](https://api.docs.bullmq.io/classes/v1.Queue3.html)
36 changes: 36 additions & 0 deletions docs/gitbook/guide/jobs/getters.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ When jobs are added to a queue, they will be in different statuses during their

It is often necessary to know how many jobs are in a given status:

{% tabs %}
{% tab title="TypeScript" %}

```typescript
import { Queue } from 'bullmq';

Expand All @@ -18,18 +21,51 @@ const counts = await myQueue.getJobCounts('wait', 'completed', 'failed');
// Returns an object like this { wait: number, completed: number, failed: number }
```

{% endtab %}

{% tab title="Python" %}

```python
from bullmq import Queue

myQueue = Queue('Paint')

counts = await myQueue.getJobCounts('wait', 'completed', 'failed')

# Returns an object like this { wait: number, completed: number, failed: number }
```

{% endtab %}
{% endtabs %}

The available status are: _completed, failed, delayed, active, wait, waiting-children, prioritized, _paused_ and _repeat._

#### Get Jobs

It is also possible to retrieve the jobs with pagination style semantics. For example:

{% tabs %}
{% tab title="TypeScript" %}

```typescript
const completed = await myQueue.getJobs(['completed'], 0, 100, true);

// returns the oldest 100 jobs
```

{% endtab %}

{% tab title="Python" %}

```python
completed = await myQueue.getJobs(['completed'], 0, 100, True)

# returns the oldest 100 jobs
```

{% endtab %}
{% endtabs %}

## Read more:

* 💡 [Get Job Counts API Reference](https://api.docs.bullmq.io/classes/v4.Queue.html#getJobCounts)
Expand Down
100 changes: 100 additions & 0 deletions docs/gitbook/patterns/process-step-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

Sometimes, it is useful to break processor function into small pieces that will be processed depending on the previous executed step, we could handle this kind of logic by using switch blocks:

{% tabs %}
{% tab title="TypeScript" %}

```typescript
enum Step {
Initial,
Expand Down Expand Up @@ -41,6 +44,40 @@ const worker = new Worker(
);
```

{% endtab %}

{% tab title="Python" %}

```python
class Step(int, Enum):
Initial = 1
Second = 2
Finish = 3

async def process(job: Job, token: str):
step = job.data.get("step")
while step != Step.Finish:
if step == Step.Initial:
await doInitialStepStuff()
await job.updateData({
"step": Step.Second
})
step = Step.Second
elif step == Step.Second:
await doSecondStepStuff()
await job.updateData({
"step": Step.Finish
})
step = Step.Finish
else:
raise Exception("invalid step")

worker = Worker("queueName", process, {"connection": connection})
```

{% endtab %}
{% endtabs %}

As you can see, we should save the step value; in this case, we are saving it into the job's data. So even in the case of an error, it would be retried in the last step that was saved (in case we use a backoff strategy).

## Delaying
Expand Down Expand Up @@ -96,6 +133,9 @@ A common use case is to add children at runtime and then wait for the children t

This can be handled using the `moveToWaitingChildren` method. However, it is important to note that when a job is being processed by a worker, the worker keeps a lock on this job with a certain token value. For the `moveToWaitingChildren` method to work, we need to pass said token so that it can unlock without error. Finally, we need to exit from the processor by throwing a special error `WaitingChildrenError` that will signal the worker that the job has been moved to waiting-children so that it does not try to complete (or fail the job) instead.

{% tabs %}
{% tab title="TypeScript" %}

```typescript
import { WaitingChildrenError, Worker } from 'bullmq';

Expand Down Expand Up @@ -170,6 +210,66 @@ const worker = new Worker(
);
```

{% endtab %}

{% tab title="Python" %}

```python
from bullmq import Worker, WaitingChildrenError
from enum import Enum

class Step(int, Enum):
Initial = 1
Second = 2
Third = 3
Finish = 4

async def process(job: Job, token: str):
step = job.data.get("step")
while step != Step.Finish:
if step == Step.Initial:
await doInitialStepStuff()
await children_queue.add('child-1', {"foo": "bar" },{
"parent": {
"id": job.id,
"queue": job.queueQualifiedName
}
})
await job.updateData({
"step": Step.Second
})
step = Step.Second
elif step == Step.Second:
await doSecondStepStuff()
await children_queue.add('child-2', {"foo": "bar" },{
"parent": {
"id": job.id,
"queue": job.queueQualifiedName
}
})
await job.updateData({
"step": Step.Third
})
step = Step.Third
elif step == Step.Third:
should_wait = await job.moveToWaitingChildren(token, {})
if not should_wait:
await job.updateData({
"step": Step.Finish
})
step = Step.Finish
return Step.Finish
else:
raise WaitingChildrenError
else:
raise Exception("invalid step")

worker = Worker("parentQueueName", process, {"connection": connection})
```

{% endtab %}
{% endtabs %}

{% hint style="info" %}
Bullmq-Pro: this pattern could be handled by using observables; in that case, we do not need to save next step.
{% endhint %}
Expand Down

0 comments on commit f1ca525

Please sign in to comment.