From 15829c524480424ce29b24fc36c452b07e352124 Mon Sep 17 00:00:00 2001 From: Gwynne Raskind Date: Thu, 13 Jun 2024 18:05:22 -0500 Subject: [PATCH] Making Queues `Sendable` (#129) * The usual package tidying * First pass at making Queues properly Sendable. Known to be incomplete. * Some general code cleanup * More package tidying * Sendable-ness * Revamp QueuesCommand to do signal handling correctly, use structured logging, always cancel tasks even if they error, and never deadlock in job scheduling. * Remove unnecessary overloads from AsyncJob * Use makeFutureWithTask() and makeSucceededVoidFuture() consistently * Add utilities for firing notification hooks * Add some utilities to JobData * Package cleanup * Completely redo QueueWorker to be most async in implementation, do structured logging, do much better logging, check delays properly, clear data from unregistered jobs, count remaining attempts properly (and without resetting maxRetryCount every time), and always requeue for retries, and just generally be better. * Log instead of printing in RepeatedTask+Cancel * Lots of structured logging * Lots of general code cleanup and doc comments * Add `AsyncQueue` to allow creating queues drivers that use async methods. * Make all the tests fully async * Address PR feedback - Application.Queues.Storage needs to be actually Sendable-safe, use assert instead of precondition in JobData init, update queuedAt value for retries in QueueWorker, QueuesCommand doesn't need to be @unchcked, make QueuesDriver require `Sendable` * Update outdated doc comment * Add some missing tests, add AsyncTestQueueDriver to XCTQueues, respect LOG_LEVEL env var in tests --- .github/workflows/test.yml | 4 +- Package.swift | 45 +- Package@swift-5.9.swift | 54 ++ README.md | 39 +- Sources/Queues/Application+Queues.swift | 150 ++-- Sources/Queues/AsyncJob.swift | 69 +- Sources/Queues/AsyncJobEventDelegate.swift | 12 +- Sources/Queues/AsyncQueue.swift | 90 +++ Sources/Queues/AsyncScheduledJob.swift | 4 +- .../Docs.docc/Resources/vapor-queues-logo.svg | 21 + Sources/Queues/Docs.docc/theme-settings.json | 21 + Sources/Queues/Exports.swift | 21 +- Sources/Queues/Job.swift | 104 ++- Sources/Queues/JobData.swift | 36 +- Sources/Queues/JobIdentifier.swift | 3 +- Sources/Queues/NotificationHook.swift | 29 +- Sources/Queues/Queue+Async.swift | 21 - Sources/Queues/Queue.swift | 69 +- Sources/Queues/QueueContext.swift | 10 +- Sources/Queues/QueueName.swift | 14 +- Sources/Queues/QueueWorker.swift | 232 ++---- Sources/Queues/QueuesCommand.swift | 227 +++--- Sources/Queues/QueuesConfiguration.swift | 101 ++- Sources/Queues/QueuesDriver.swift | 13 +- .../Queues/QueuesEventLoopPreference.swift | 14 +- Sources/Queues/RepeatedTask+Cancel.swift | 9 +- Sources/Queues/Request+Queues.swift | 15 +- Sources/Queues/ScheduleBuilder.swift | 429 +++------- Sources/Queues/ScheduledJob.swift | 35 +- .../Docs.docc/Resources/vapor-queues-logo.svg | 21 + .../XCTQueues/Docs.docc/theme-settings.json | 21 + Sources/XCTQueues/TestQueueDriver.swift | 149 ++-- Tests/QueuesTests/AsyncQueueTests.swift | 63 +- Tests/QueuesTests/QueueTests.swift | 740 +++++++++++------- Tests/QueuesTests/ScheduleBuilderTests.swift | 9 +- Tests/QueuesTests/Utilities.swift | 15 + 36 files changed, 1588 insertions(+), 1321 deletions(-) create mode 100644 Package@swift-5.9.swift create mode 100644 Sources/Queues/AsyncQueue.swift create mode 100644 Sources/Queues/Docs.docc/Resources/vapor-queues-logo.svg create mode 100644 Sources/Queues/Docs.docc/theme-settings.json delete mode 100644 Sources/Queues/Queue+Async.swift create mode 100644 Sources/XCTQueues/Docs.docc/Resources/vapor-queues-logo.svg create mode 100644 Sources/XCTQueues/Docs.docc/theme-settings.json create mode 100644 Tests/QueuesTests/Utilities.swift diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 48b8aa7..04c492a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -5,7 +5,7 @@ on: jobs: unit-tests: - uses: vapor/ci/.github/workflows/run-unit-tests.yml@reusable-workflows + uses: vapor/ci/.github/workflows/run-unit-tests.yml@main with: - with_coverage: true with_tsan: false + secrets: inherit diff --git a/Package.swift b/Package.swift index aec0cdf..e3d8f64 100644 --- a/Package.swift +++ b/Package.swift @@ -6,8 +6,8 @@ let package = Package( platforms: [ .macOS(.v10_15), .iOS(.v13), - .tvOS(.v13), .watchOS(.v6), + .tvOS(.v13), ], products: [ .library(name: "Queues", targets: ["Queues"]), @@ -15,20 +15,37 @@ let package = Package( ], dependencies: [ .package(url: "https://github.com/vapor/vapor.git", from: "4.101.1"), - .package(url: "https://github.com/apple/swift-nio.git", from: "2.53.0"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.65.0"), ], targets: [ - .target(name: "Queues", dependencies: [ - .product(name: "Vapor", package: "vapor"), - .product(name: "NIOCore", package: "swift-nio"), - ]), - .target(name: "XCTQueues", dependencies: [ - .target(name: "Queues") - ]), - .testTarget(name: "QueuesTests", dependencies: [ - .target(name: "Queues"), - .product(name: "XCTVapor", package: "vapor"), - .target(name: "XCTQueues") - ]), + .target( + name: "Queues", + dependencies: [ + .product(name: "Vapor", package: "vapor"), + .product(name: "NIOCore", package: "swift-nio"), + ], + swiftSettings: swiftSettings + ), + .target( + name: "XCTQueues", + dependencies: [ + .target(name: "Queues"), + ], + swiftSettings: swiftSettings + ), + .testTarget( + name: "QueuesTests", + dependencies: [ + .target(name: "Queues"), + .target(name: "XCTQueues"), + .product(name: "XCTVapor", package: "vapor"), + ], + swiftSettings: swiftSettings + ), ] ) + +var swiftSettings: [SwiftSetting] { [ + .enableUpcomingFeature("ForwardTrailingClosures"), + .enableUpcomingFeature("ConciseMagicFile"), +] } diff --git a/Package@swift-5.9.swift b/Package@swift-5.9.swift new file mode 100644 index 0000000..1092a83 --- /dev/null +++ b/Package@swift-5.9.swift @@ -0,0 +1,54 @@ +// swift-tools-version:5.9 +import PackageDescription + +let package = Package( + name: "queues", + platforms: [ + .macOS(.v10_15), + .iOS(.v13), + .watchOS(.v6), + .tvOS(.v13), + ], + products: [ + .library(name: "Queues", targets: ["Queues"]), + .library(name: "XCTQueues", targets: ["XCTQueues"]) + ], + dependencies: [ + .package(url: "https://github.com/vapor/vapor.git", from: "4.101.1"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.65.0"), + ], + targets: [ + .target( + name: "Queues", + dependencies: [ + .product(name: "Vapor", package: "vapor"), + .product(name: "NIOCore", package: "swift-nio"), + ], + swiftSettings: swiftSettings + ), + .target( + name: "XCTQueues", + dependencies: [ + .target(name: "Queues"), + ], + swiftSettings: swiftSettings + ), + .testTarget( + name: "QueuesTests", + dependencies: [ + .target(name: "Queues"), + .target(name: "XCTQueues"), + .product(name: "XCTVapor", package: "vapor"), + ], + swiftSettings: swiftSettings + ), + ] +) + +var swiftSettings: [SwiftSetting] { [ + .enableUpcomingFeature("ForwardTrailingClosures"), + .enableUpcomingFeature("ExistentialAny"), + .enableUpcomingFeature("ConciseMagicFile"), + .enableUpcomingFeature("DisableOutwardActorInference"), + .enableExperimentalFeature("StrictConcurrency=complete"), +] } diff --git a/README.md b/README.md index fac4104..7c713f7 100644 --- a/README.md +++ b/README.md @@ -1,29 +1,18 @@

- Queues -
-
- - Docs - - - Team Chat - - - MIT License - - - Continuous Integration - - - Swift 5.6 - - - Swift 5.8 - + + + + Queues + +
+
+Documentation +Team Chat +MIT License +Continuous Integration + +Swift 5.8+

+
diff --git a/Sources/Queues/Application+Queues.swift b/Sources/Queues/Application+Queues.swift index dc7d914..021d069 100644 --- a/Sources/Queues/Application+Queues.swift +++ b/Sources/Queues/Application+Queues.swift @@ -2,39 +2,56 @@ import Foundation import Logging import Vapor import NIO +import NIOConcurrencyHelpers extension Application { - /// The `Queues` object + /// The application-global ``Queues`` accessor. public var queues: Queues { .init(application: self) } - /// Represents a `Queues` configuration object + /// Contains global configuration for queues and provides methods for registering jobs and retrieving queues. public struct Queues { - - /// The provider of the `Queues` configuration + /// A provider for a ``Queues`` driver. public struct Provider { - let run: (Application) -> () + let run: @Sendable (Application) -> () - public init(_ run: @escaping (Application) -> ()) { + public init(_ run: @escaping @Sendable (Application) -> ()) { self.run = run } } - final class Storage { - public var configuration: QueuesConfiguration - private (set) var commands: [QueuesCommand] - var driver: QueuesDriver? + final class Storage: Sendable { + private struct Box: Sendable { + var configuration: QueuesConfiguration + var commands: [QueuesCommand] + var driver: (any QueuesDriver)? + } + private let box: NIOLockedValueBox + + public var configuration: QueuesConfiguration { + get { self.box.withLockedValue { $0.configuration } } + set { self.box.withLockedValue { $0.configuration = newValue } } + } + var commands: [QueuesCommand] { self.box.withLockedValue { $0.commands } } + var driver: (any QueuesDriver)? { + get { self.box.withLockedValue { $0.driver } } + set { self.box.withLockedValue { $0.driver = newValue } } + } public init(_ application: Application) { - self.configuration = .init(logger: application.logger) - let command: QueuesCommand = .init(application: application) - self.commands = [command] - application.commands.use(command, as: "queues") + let command = QueuesCommand(application: application) + + self.box = .init(.init( + configuration: .init(logger: application.logger), + commands: [command], + driver: nil + )) + application.asyncCommands.use(command, as: "queues") } public func add(command: QueuesCommand) { - self.commands.append(command) + self.box.withLockedValue { $0.commands.append(command) } } } @@ -44,35 +61,26 @@ extension Application { struct Lifecycle: LifecycleHandler { func shutdown(_ application: Application) { - application.queues.storage.commands.forEach({$0.shutdown()}) - if let driver = application.queues.storage.driver { - driver.shutdown() - } + application.queues.storage.commands.forEach { $0.shutdown() } + application.queues.storage.driver?.shutdown() } func shutdownAsync(_ application: Application) async { for command in application.queues.storage.commands { await command.asyncShutdown() } - if let driver = application.queues.storage.driver { - await driver.asyncShutdown() - } + await application.queues.storage.driver?.asyncShutdown() } } - /// The `QueuesConfiguration` object + /// The ``QueuesConfiguration`` object. public var configuration: QueuesConfiguration { get { self.storage.configuration } nonmutating set { self.storage.configuration = newValue } } - /// Returns the default `Queue` - public var queue: Queue { - self.queue(.default) - } - - /// The selected `QueuesDriver` - public var driver: QueuesDriver { + /// The selected ``QueuesDriver``. + public var driver: any QueuesDriver { guard let driver = self.storage.driver else { fatalError("No Queues driver configured. Configure with app.queues.use(...)") } @@ -88,7 +96,13 @@ extension Application { public let application: Application - /// Returns a `JobsQueue` + /// Get the default ``Queue``. + public var queue: any Queue { + self.queue(.default) + } + + /// Create or look up an instance of a named ``Queue``. + /// /// - Parameters: /// - name: The name of the queue /// - logger: A logger object @@ -96,71 +110,75 @@ extension Application { public func queue( _ name: QueueName, logger: Logger? = nil, - on eventLoop: EventLoop? = nil - ) -> Queue { - return self.driver.makeQueue( - with: .init( - queueName: name, - configuration: self.configuration, - application: self.application, - logger: logger ?? self.application.logger, - on: eventLoop ?? self.application.eventLoopGroup.next() - ) - ) + on eventLoop: (any EventLoop)? = nil + ) -> any Queue { + self.driver.makeQueue(with: .init( + queueName: name, + configuration: self.configuration, + application: self.application, + logger: logger ?? self.application.logger, + on: eventLoop ?? self.application.eventLoopGroup.any() + )) } - /// Adds a new queued job - /// - Parameter job: The job to add - public func add(_ job: J) where J: Job { + /// Add a new queueable job. + /// + /// This must be called once for each job type that can be queued. + /// + /// - Parameter job: The job to add. + public func add(_ job: some Job) { self.configuration.add(job) } - /// Adds a new notification hook - /// - Parameter hook: The hook object to add - public func add(_ hook: N) where N: JobEventDelegate { + /// Add a new notification hook. + /// + /// - Parameter hook: The hook to add. + public func add(_ hook: some JobEventDelegate) { self.configuration.add(hook) } - /// Choose which provider to use - /// - Parameter provider: The provider + /// Choose which provider to use. + /// + /// - Parameter provider: The provider. public func use(_ provider: Provider) { provider.run(self.application) } - /// Choose which driver to use + /// Configure a driver. + /// /// - Parameter driver: The driver - public func use(custom driver: QueuesDriver) { + public func use(custom driver: any QueuesDriver) { self.storage.driver = driver } - /// Schedule a new job - /// - Parameter job: The job to schedule - public func schedule(_ job: J) -> ScheduleBuilder - where J: ScheduledJob - { - let builder = ScheduleBuilder() - _ = self.storage.configuration.schedule(job, builder: builder) - return builder + /// Schedule a new job. + /// + /// - Parameter job: The job to schedule. + public func schedule(_ job: some ScheduledJob) -> ScheduleBuilder { + self.storage.configuration.schedule(job) } - /// Starts an in-process worker to dequeue and run jobs - /// - Parameter queue: The queue to run the jobs on. Defaults to `default` + /// Starts an in-process worker to dequeue and run jobs. + /// + /// - Parameter queue: The queue to run the jobs on. Defaults to ``QueueName/default``. public func startInProcessJobs(on queue: QueueName = .default) throws { - let inProcessJobs = QueuesCommand(application: application, scheduled: false) + let inProcessJobs = QueuesCommand(application: self.application) + try inProcessJobs.startJobs(on: queue) self.storage.add(command: inProcessJobs) } - /// Starts an in-process worker to run scheduled jobs + /// Starts an in-process worker to run scheduled jobs. public func startScheduledJobs() throws { - let scheduledJobs = QueuesCommand(application: application, scheduled: true) + let scheduledJobs = QueuesCommand(application: self.application) + try scheduledJobs.startScheduledJobs() self.storage.add(command: scheduledJobs) } func initialize() { self.application.lifecycle.use(Lifecycle()) - self.application.storage[Key.self] = .init(application) + self.application.storage[Key.self] = .init(self.application) } } } diff --git a/Sources/Queues/AsyncJob.swift b/Sources/Queues/AsyncJob.swift index 197e77e..ba84375 100644 --- a/Sources/Queues/AsyncJob.swift +++ b/Sources/Queues/AsyncJob.swift @@ -4,88 +4,49 @@ import Foundation /// A task that can be queued for future execution. public protocol AsyncJob: Job { - /// The data associated with a job associatedtype Payload /// Called when it's this Job's turn to be dequeued. + /// /// - Parameters: - /// - context: The JobContext. Can be used to store and retrieve services - /// - payload: The data for this handler + /// - context: The ``QueueContext``. + /// - payload: The typed job payload. func dequeue( _ context: QueueContext, _ payload: Payload ) async throws /// Called when there is an error at any stage of the Job's execution. + /// /// - Parameters: - /// - context: The JobContext. Can be used to store and retrieve services + /// - context: The ``QueueContext``. /// - error: The error returned by the job. - /// - payload: The typed payload for the job + /// - payload: The typed job payload. func error( _ context: QueueContext, - _ error: Error, + _ error: any Error, _ payload: Payload ) async throws - - /// Called when there was an error and job will be retired. - /// - /// - Parameters: - /// - attempt: Number of job attempts which failed - /// - Returns: Number of seconds for which next retry will be delayed. - /// Return `-1` if you want to retry job immediately without putting it back to the queue. - func nextRetryIn(attempt: Int) -> Int - - static func serializePayload(_ payload: Payload) throws -> [UInt8] - static func parsePayload(_ bytes: [UInt8]) throws -> Payload } -extension AsyncJob where Payload: Codable { - - /// Serialize a payload into Data - /// - Parameter payload: The payload - public static func serializePayload(_ payload: Payload) throws -> [UInt8] { - try .init(JSONEncoder().encode(payload)) - } - - /// Parse bytes into the payload - /// - Parameter bytes: The Payload - public static func parsePayload(_ bytes: [UInt8]) throws -> Payload { - try JSONDecoder().decode(Payload.self, from: .init(bytes)) - } +extension AsyncJob { + /// Default implementation of ``AsyncJob/error(_:_:_:)-8627d``. + public func error(_ context: QueueContext, _ error: any Error, _ payload: Payload) async throws {} } extension AsyncJob { - /// The jobName of the Job - public static var name: String { - return String(describing: Self.self) - } - - /// See `Job`.`nextRetryIn` - public func nextRetryIn(attempt: Int) -> Int { - return -1 - } - - public func _nextRetryIn(attempt: Int) -> Int { - return nextRetryIn(attempt: attempt) - } - + /// Forward ``Job/dequeue(_:_:)`` to ``AsyncJob/dequeue(_:_:)-9g26t``. public func dequeue(_ context: QueueContext, _ payload: Payload) -> EventLoopFuture { - let promise = context.eventLoop.makePromise(of: Void.self) - promise.completeWithTask { + context.eventLoop.makeFutureWithTask { try await self.dequeue(context, payload) } - return promise.futureResult } - public func error(_ context: QueueContext, _ error: Error, _ payload: Payload) -> EventLoopFuture { - let promise = context.eventLoop.makePromise(of: Void.self) - promise.completeWithTask { + /// Forward ``Job/error(_:_:_:)-2brrj`` to ``AsyncJob/error(_:_:_:)-8627d`` + public func error(_ context: QueueContext, _ error: any Error, _ payload: Payload) -> EventLoopFuture { + context.eventLoop.makeFutureWithTask { try await self.error(context, error, payload) } - return promise.futureResult } - public func error(_ context: QueueContext, _ error: Error, _ payload: Payload) async throws { - return - } } diff --git a/Sources/Queues/AsyncJobEventDelegate.swift b/Sources/Queues/AsyncJobEventDelegate.swift index d76fad4..6aa1711 100644 --- a/Sources/Queues/AsyncJobEventDelegate.swift +++ b/Sources/Queues/AsyncJobEventDelegate.swift @@ -21,34 +21,34 @@ public protocol AsyncJobEventDelegate: JobEventDelegate { /// - Parameters: /// - jobId: The id of the Job /// - error: The error that caused the job to fail - func error(jobId: String, error: Error) async throws + func error(jobId: String, error: any Error) async throws } extension AsyncJobEventDelegate { public func dispatched(job: JobEventData) async throws { } public func didDequeue(jobId: String) async throws { } public func success(jobId: String) async throws { } - public func error(jobId: String, error: Error) async throws { } + public func error(jobId: String, error: any Error) async throws { } - public func dispatched(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture { + public func dispatched(job: JobEventData, eventLoop: any EventLoop) -> EventLoopFuture { eventLoop.makeFutureWithTask { try await self.dispatched(job: job) } } - public func didDequeue(jobId: String, eventLoop: EventLoop) -> EventLoopFuture { + public func didDequeue(jobId: String, eventLoop: any EventLoop) -> EventLoopFuture { eventLoop.makeFutureWithTask { try await self.didDequeue(jobId: jobId) } } - public func success(jobId: String, eventLoop: EventLoop) -> EventLoopFuture { + public func success(jobId: String, eventLoop: any EventLoop) -> EventLoopFuture { eventLoop.makeFutureWithTask { try await self.success(jobId: jobId) } } - public func error(jobId: String, error: Error, eventLoop: EventLoop) -> EventLoopFuture { + public func error(jobId: String, error: any Error, eventLoop: any EventLoop) -> EventLoopFuture { eventLoop.makeFutureWithTask { try await self.error(jobId: jobId, error: error) } diff --git a/Sources/Queues/AsyncQueue.swift b/Sources/Queues/AsyncQueue.swift new file mode 100644 index 0000000..5593408 --- /dev/null +++ b/Sources/Queues/AsyncQueue.swift @@ -0,0 +1,90 @@ +import Foundation +import Vapor +import NIOCore + +public protocol AsyncQueue: Queue { + /// The job context + var context: QueueContext { get } + + /// Gets the next job to be run + /// - Parameter id: The ID of the job + func get(_ id: JobIdentifier) async throws -> JobData + + /// Sets a job that should be run in the future + /// - Parameters: + /// - id: The ID of the job + /// - data: Data for the job + func set(_ id: JobIdentifier, to data: JobData) async throws + + /// Removes a job from the queue + /// - Parameter id: The ID of the job + func clear(_ id: JobIdentifier) async throws + + /// Pops the next job in the queue + func pop() async throws -> JobIdentifier? + + /// Pushes the next job into a queue + /// - Parameter id: The ID of the job + func push(_ id: JobIdentifier) async throws +} + +extension AsyncQueue { + public func get(_ id: JobIdentifier) -> EventLoopFuture { + self.context.eventLoop.makeFutureWithTask { try await self.get(id) } + } + + public func set(_ id: JobIdentifier, to data: JobData) -> EventLoopFuture { + self.context.eventLoop.makeFutureWithTask { try await self.set(id, to: data) } + } + + public func clear(_ id: JobIdentifier) -> EventLoopFuture { + self.context.eventLoop.makeFutureWithTask { try await self.clear(id) } + } + + public func pop() -> EventLoopFuture { + self.context.eventLoop.makeFutureWithTask { try await self.pop() } + } + + public func push(_ id: JobIdentifier) -> EventLoopFuture { + self.context.eventLoop.makeFutureWithTask { try await self.push(id) } + } +} + +extension Queue { + /// Dispatch a job into the queue for processing + /// - Parameters: + /// - job: The Job type + /// - payload: The payload data to be dispatched + /// - maxRetryCount: Number of times to retry this job on failure + /// - delayUntil: Delay the processing of this job until a certain date + public func dispatch( + _ job: J.Type, + _ payload: J.Payload, + maxRetryCount: Int = 0, + delayUntil: Date? = nil, + id: JobIdentifier = .init() + ) async throws { + var logger = self.logger + logger[metadataKey: "queue"] = "\(self.queueName.string)" + logger[metadataKey: "job-id"] = "\(id.string)" + logger[metadataKey: "job-name"] = "\(J.name)" + + let storage = JobData( + payload: try J.serializePayload(payload), + maxRetryCount: maxRetryCount, + jobName: J.name, + delayUntil: delayUntil, + queuedAt: .init() + ) + + logger.trace("Storing job data") + try await self.set(id, to: storage).get() + logger.trace("Pusing job to queue") + try await self.push(id).get() + logger.info("Dispatched job") + + await self.sendNotification(of: "dispatch", logger: logger) { + try await $0.dispatched(job: .init(id: id.string, queueName: self.queueName.string, jobData: storage), eventLoop: self.eventLoop).get() + } + } +} diff --git a/Sources/Queues/AsyncScheduledJob.swift b/Sources/Queues/AsyncScheduledJob.swift index 14af89f..1041eaa 100644 --- a/Sources/Queues/AsyncScheduledJob.swift +++ b/Sources/Queues/AsyncScheduledJob.swift @@ -15,10 +15,8 @@ extension AsyncScheduledJob { public var name: String { "\(Self.self)" } public func run(context: QueueContext) -> EventLoopFuture { - let promise = context.eventLoop.makePromise(of: Void.self) - promise.completeWithTask { + context.eventLoop.makeFutureWithTask { try await self.run(context: context) } - return promise.futureResult } } diff --git a/Sources/Queues/Docs.docc/Resources/vapor-queues-logo.svg b/Sources/Queues/Docs.docc/Resources/vapor-queues-logo.svg new file mode 100644 index 0000000..f40f693 --- /dev/null +++ b/Sources/Queues/Docs.docc/Resources/vapor-queues-logo.svg @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + + + + + diff --git a/Sources/Queues/Docs.docc/theme-settings.json b/Sources/Queues/Docs.docc/theme-settings.json new file mode 100644 index 0000000..1e0af2e --- /dev/null +++ b/Sources/Queues/Docs.docc/theme-settings.json @@ -0,0 +1,21 @@ +{ + "theme": { + "aside": { "border-radius": "16px", "border-style": "double", "border-width": "3px" }, + "border-radius": "0", + "button": { "border-radius": "16px", "border-width": "1px", "border-style": "solid" }, + "code": { "border-radius": "16px", "border-width": "1px", "border-style": "solid" }, + "color": { + "queues": "#e8665a", + "documentation-intro-fill": "radial-gradient(circle at top, var(--color-queues) 30%, #000 100%)", + "documentation-intro-accent": "var(--color-queues)", + "logo-base": { "dark": "#fff", "light": "#000" }, + "logo-shape": { "dark": "#000", "light": "#fff" }, + "fill": { "dark": "#000", "light": "#fff" } + }, + "icons": { "technology": "/queues/images/vapor-queues-logo.svg" } + }, + "features": { + "quickNavigation": { "enable": true }, + "i18n": { "enable": true } + } +} diff --git a/Sources/Queues/Exports.swift b/Sources/Queues/Exports.swift index 3d38063..1008b89 100644 --- a/Sources/Queues/Exports.swift +++ b/Sources/Queues/Exports.swift @@ -1,19 +1,6 @@ -#if swift(>=5.8) - @_documentation(visibility: internal) @_exported import struct Foundation.Date @_documentation(visibility: internal) @_exported import struct Logging.Logger -@_documentation(visibility: internal) @_exported import class NIO.EventLoopFuture -@_documentation(visibility: internal) @_exported import struct NIO.EventLoopPromise -@_documentation(visibility: internal) @_exported import protocol NIO.EventLoop -@_documentation(visibility: internal) @_exported import struct NIO.TimeAmount - -#else - -@_exported import struct Foundation.Date -@_exported import struct Logging.Logger -@_exported import class NIO.EventLoopFuture -@_exported import struct NIO.EventLoopPromise -@_exported import protocol NIO.EventLoop -@_exported import struct NIO.TimeAmount - -#endif +@_documentation(visibility: internal) @_exported import class NIOCore.EventLoopFuture +@_documentation(visibility: internal) @_exported import struct NIOCore.EventLoopPromise +@_documentation(visibility: internal) @_exported import protocol NIOCore.EventLoop +@_documentation(visibility: internal) @_exported import struct NIOCore.TimeAmount diff --git a/Sources/Queues/Job.swift b/Sources/Queues/Job.swift index 80dfc68..7cd2937 100644 --- a/Sources/Queues/Job.swift +++ b/Sources/Queues/Job.swift @@ -6,105 +6,129 @@ import Vapor /// A task that can be queued for future execution. public protocol Job: AnyJob { /// The data associated with a job - associatedtype Payload + associatedtype Payload: Sendable /// Called when it's this Job's turn to be dequeued. + /// /// - Parameters: - /// - context: The JobContext. Can be used to store and retrieve services - /// - payload: The data for this handler + /// - context: The ``QueueContext``. + /// - payload: The typed job payload. func dequeue( _ context: QueueContext, _ payload: Payload ) -> EventLoopFuture /// Called when there is an error at any stage of the Job's execution. + /// /// - Parameters: - /// - context: The JobContext. Can be used to store and retrieve services + /// - context: The ``QueueContext``. /// - error: The error returned by the job. - /// - payload: The typed payload for the job + /// - payload: The typed job payload. func error( _ context: QueueContext, - _ error: Error, + _ error: any Error, _ payload: Payload ) -> EventLoopFuture - /// Called when there was an error and job will be retired. + /// Called when there was an error and the job will be retried. /// - /// - Parameters: - /// - attempt: Number of job attempts which failed - /// - Returns: Number of seconds for which next retry will be delayed. - /// Return `-1` if you want to retry job immediately without putting it back to the queue. + /// - Parameter attempt: Number of job attempts which have failed so far. + /// - Returns: Number of seconds to delay the next retry. Return `0` to place the job back on the queue with no + /// delay added. If this method is not implemented, the default is `0`. Returning `-1` is the same as `0`. func nextRetryIn(attempt: Int) -> Int + /// Serialize a typed payload to an array of bytes. When `Payload` is `Codable`, this method will default to + /// encoding to JSON. + /// + /// - Parameter payload: The payload to serialize. + /// - Returns: The array of serialized bytes. static func serializePayload(_ payload: Payload) throws -> [UInt8] + + /// Deserialize an array of bytes into a typed payload. When `Payload` is `Codable`, this method will default to + /// decoding from JSON. + /// + /// - Parameter bytes: The serialized bytes to decode. + /// - Returns: A decoded payload. static func parsePayload(_ bytes: [UInt8]) throws -> Payload } -extension Job where Payload: Codable { - - /// Serialize a payload into Data - /// - Parameter payload: The payload +extension Job where Payload: Codable { + /// Default implementation for ``Job/serializePayload(_:)-4uro2``. public static func serializePayload(_ payload: Payload) throws -> [UInt8] { try .init(JSONEncoder().encode(payload)) } - /// Parse bytes into the payload - /// - Parameter bytes: The Payload + /// Default implementation for ``Job/parsePayload(_:)-9tn3a``. public static func parsePayload(_ bytes: [UInt8]) throws -> Payload { try JSONDecoder().decode(Payload.self, from: .init(bytes)) } } extension Job { - /// The jobName of the Job + /// Default implementation for ``AnyJob/name``. public static var name: String { - return String(describing: Self.self) + String(describing: Self.self) } - /// See `Job`.`error` + /// Default implementation for ``Job/error(_:_:_:)-jzgw``. public func error( _ context: QueueContext, - _ error: Error, + _ error: any Error, _ payload: Payload ) -> EventLoopFuture { - context.eventLoop.makeSucceededFuture(()) + context.eventLoop.makeSucceededVoidFuture() } - /// See `Job`.`nextRetryIn` + /// Default implementation for ``Job/nextRetryIn(attempt:)-5gc93``. public func nextRetryIn(attempt: Int) -> Int { - return -1 + 0 } +} +/// A type-erased version of ``Job``. +public protocol AnyJob: Sendable { + /// The name of the job. + static var name: String { get } + + /// Perform ``Job/dequeue(_:_:)`` after deserializing the raw payload bytes. + func _dequeue(_ context: QueueContext, id: String, payload: [UInt8]) -> EventLoopFuture + + /// Perform ``Job/error(_:_:_:)-2brrj`` after deserializing the raw payload bytes. + func _error(_ context: QueueContext, id: String, _ error: any Error, payload: [UInt8]) -> EventLoopFuture + + /// Type-erased accessor for ``Job/nextRetryIn(attempt:)-5gc93``. + func _nextRetryIn(attempt: Int) -> Int +} + +// N.B. These should really not be public. +extension Job { + // See `AnyJob._nextRetryIn(attempt:)`. public func _nextRetryIn(attempt: Int) -> Int { - return nextRetryIn(attempt: attempt) + self.nextRetryIn(attempt: attempt) } - public func _error(_ context: QueueContext, id: String, _ error: Error, payload: [UInt8]) -> EventLoopFuture { - var contextCopy = context - contextCopy.logger[metadataKey: "job_id"] = .string(id) + // See `AnyJob._error(_:id:_:payload:)`. + public func _error(_ context: QueueContext, id: String, _ error: any Error, payload: [UInt8]) -> EventLoopFuture { + var context = context + context.logger[metadataKey: "queue"] = "\(context.queueName.string)" + context.logger[metadataKey: "job_id"] = "\(id)" do { - return try self.error(contextCopy, error, Self.parsePayload(payload)) + return try self.error(context, error, Self.parsePayload(payload)) } catch { return context.eventLoop.makeFailedFuture(error) } } + // See `AnyJob._dequeue(_:id:payload:)`. public func _dequeue(_ context: QueueContext, id: String, payload: [UInt8]) -> EventLoopFuture { - var contextCopy = context - contextCopy.logger[metadataKey: "job_id"] = .string(id) + var context = context + context.logger[metadataKey: "queue"] = "\(context.queueName.string)" + context.logger[metadataKey: "job_id"] = "\(id)" do { - return try self.dequeue(contextCopy, Self.parsePayload(payload)) + return try self.dequeue(context, Self.parsePayload(payload)) } catch { return context.eventLoop.makeFailedFuture(error) } } } -/// A type-erased version of `Job` -public protocol AnyJob { - /// The name of the `Job` - static var name: String { get } - func _dequeue(_ context: QueueContext, id: String, payload: [UInt8]) -> EventLoopFuture - func _error(_ context: QueueContext, id: String, _ error: Error, payload: [UInt8]) -> EventLoopFuture - func _nextRetryIn(attempt: Int) -> Int -} diff --git a/Sources/Queues/JobData.swift b/Sources/Queues/JobData.swift index ba228dd..ab32c33 100644 --- a/Sources/Queues/JobData.swift +++ b/Sources/Queues/JobData.swift @@ -1,7 +1,7 @@ import Foundation /// Holds information about the Job that is to be encoded to the persistence store. -public struct JobData: Codable { +public struct JobData: Codable, Sendable { /// The job data to be encoded. public let payload: [UInt8] @@ -29,6 +29,9 @@ public struct JobData: Codable { queuedAt: Date, attempts: Int = 0 ) { + assert(maxRetryCount >= 0) + assert(attempts >= 0) + self.payload = payload self.maxRetryCount = maxRetryCount self.jobName = jobName @@ -37,3 +40,34 @@ public struct JobData: Codable { self.attempts = attempts } } + +// N.B.: These methods are intended for internal use only. +extension JobData { + /// The non-`nil` number of attempts made to run this job (how many times has it failed). + /// This can also be treated as a "retry" count. + /// + /// Value | Meaning + /// -|- + /// 0 | The job has never run, or succeeded on its first attempt + /// 1 | The job has failed once and is queued for its first retry + /// 2... | The job has failed N times and is queued for its Nth retry + var failureCount: Int { + self.attempts ?? 0 + } + + /// The number of retries left iff the current (re)try fails. + var remainingAttempts: Int { + Swift.max(0, self.maxRetryCount - self.failureCount) + } + + /// The current attempt number. + /// + /// Value|Meaning + /// -|- + /// 0|Not valid + /// 1|The job has not failed thus far; this the first attempt. + /// 2|The job has failed once; this is the second attempt. + var currentAttempt: Int { + self.failureCount + 1 + } +} diff --git a/Sources/Queues/JobIdentifier.swift b/Sources/Queues/JobIdentifier.swift index c811d14..3739220 100644 --- a/Sources/Queues/JobIdentifier.swift +++ b/Sources/Queues/JobIdentifier.swift @@ -1,8 +1,7 @@ import struct Foundation.UUID /// An identifier for a job -public struct JobIdentifier: Hashable, Equatable { - +public struct JobIdentifier: Hashable, Equatable, Sendable { /// The string value of the ID public let string: String diff --git a/Sources/Queues/NotificationHook.swift b/Sources/Queues/NotificationHook.swift index ea317d9..54724e6 100644 --- a/Sources/Queues/NotificationHook.swift +++ b/Sources/Queues/NotificationHook.swift @@ -2,55 +2,54 @@ import NIOCore import Foundation /// Represents an object that can receive notifications about job statuses -public protocol JobEventDelegate { - +public protocol JobEventDelegate: Sendable { /// Called when the job is first dispatched /// - Parameters: /// - job: The `JobData` associated with the job /// - eventLoop: The eventLoop - func dispatched(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture + func dispatched(job: JobEventData, eventLoop: any EventLoop) -> EventLoopFuture /// Called when the job is dequeued /// - Parameters: /// - jobId: The id of the Job /// - eventLoop: The eventLoop - func didDequeue(jobId: String, eventLoop: EventLoop) -> EventLoopFuture + func didDequeue(jobId: String, eventLoop: any EventLoop) -> EventLoopFuture /// Called when the job succeeds /// - Parameters: /// - jobId: The id of the Job /// - eventLoop: The eventLoop - func success(jobId: String, eventLoop: EventLoop) -> EventLoopFuture + func success(jobId: String, eventLoop: any EventLoop) -> EventLoopFuture /// Called when the job returns an error /// - Parameters: /// - jobId: The id of the Job /// - error: The error that caused the job to fail /// - eventLoop: The eventLoop - func error(jobId: String, error: Error, eventLoop: EventLoop) -> EventLoopFuture + func error(jobId: String, error: any Error, eventLoop: any EventLoop) -> EventLoopFuture } extension JobEventDelegate { - public func dispatched(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture { - eventLoop.future() + public func dispatched(job: JobEventData, eventLoop: any EventLoop) -> EventLoopFuture { + eventLoop.makeSucceededVoidFuture() } - public func didDequeue(jobId: String, eventLoop: EventLoop) -> EventLoopFuture { - eventLoop.future() + public func didDequeue(jobId: String, eventLoop: any EventLoop) -> EventLoopFuture { + eventLoop.makeSucceededVoidFuture() } - public func success(jobId: String, eventLoop: EventLoop) -> EventLoopFuture { - eventLoop.future() + public func success(jobId: String, eventLoop: any EventLoop) -> EventLoopFuture { + eventLoop.makeSucceededVoidFuture() } - public func error(jobId: String, error: Error, eventLoop: EventLoop) -> EventLoopFuture { - eventLoop.future() + public func error(jobId: String, error: any Error, eventLoop: any EventLoop) -> EventLoopFuture { + eventLoop.makeSucceededVoidFuture() } } /// Data on a job sent via a notification -public struct JobEventData { +public struct JobEventData: Sendable { /// The id of the job, assigned at dispatch public var id: String diff --git a/Sources/Queues/Queue+Async.swift b/Sources/Queues/Queue+Async.swift deleted file mode 100644 index 71f1775..0000000 --- a/Sources/Queues/Queue+Async.swift +++ /dev/null @@ -1,21 +0,0 @@ -import Foundation -import Vapor -import NIOCore - -extension Queue { - /// Dispatch a job into the queue for processing - /// - Parameters: - /// - job: The Job type - /// - payload: The payload data to be dispatched - /// - maxRetryCount: Number of times to retry this job on failure - /// - delayUntil: Delay the processing of this job until a certain date - public func dispatch( - _ job: J.Type, - _ payload: J.Payload, - maxRetryCount: Int = 0, - delayUntil: Date? = nil, - id: JobIdentifier = JobIdentifier() - ) async throws where J: Job { - try await self.dispatch(job, payload, maxRetryCount: maxRetryCount, delayUntil: delayUntil, id: id).get() - } -} diff --git a/Sources/Queues/Queue.swift b/Sources/Queues/Queue.swift index cfef68f..efee13e 100644 --- a/Sources/Queues/Queue.swift +++ b/Sources/Queues/Queue.swift @@ -3,7 +3,7 @@ import Logging import Foundation /// A type that can store and retrieve jobs from a persistence layer -public protocol Queue { +public protocol Queue: Sendable { /// The job context var context: QueueContext { get } @@ -31,7 +31,7 @@ public protocol Queue { extension Queue { /// The EventLoop for a job queue - public var eventLoop: EventLoop { + public var eventLoop: any EventLoop { self.context.eventLoop } @@ -61,22 +61,26 @@ extension Queue { /// - payload: The payload data to be dispatched /// - maxRetryCount: Number of times to retry this job on failure /// - delayUntil: Delay the processing of this job until a certain date - public func dispatch( + public func dispatch( _ job: J.Type, _ payload: J.Payload, maxRetryCount: Int = 0, delayUntil: Date? = nil, - id: JobIdentifier = JobIdentifier() - ) -> EventLoopFuture - where J: Job - { + id: JobIdentifier = .init() + ) -> EventLoopFuture { + var logger_ = self.logger + logger_[metadataKey: "queue"] = "\(self.queueName.string)" + logger_[metadataKey: "job-id"] = "\(id.string)" + logger_[metadataKey: "job-name"] = "\(J.name)" + let logger = logger_ + let bytes: [UInt8] do { bytes = try J.serializePayload(payload) } catch { return self.eventLoop.makeFailedFuture(error) } - logger.trace("Serialized bytes for payload: \(bytes)") + let storage = JobData( payload: bytes, maxRetryCount: maxRetryCount, @@ -84,21 +88,44 @@ extension Queue { delayUntil: delayUntil, queuedAt: Date() ) - logger.trace("Adding the ID to the storage") + + logger.trace("Storing job data") return self.set(id, to: storage).flatMap { - self.push(id) - }.flatMap { _ in - self.logger.info("Dispatched queue job", metadata: [ - "job_id": .string(id.string), - "job_name": .string(job.name), - "queue": .string(self.queueName.string) - ]) + logger.trace("Pusing job to queue") + return self.push(id) + }.flatMapWithEventLoop { _, eventLoop in + logger.info("Dispatched job") + return self.sendNotification(of: "dispatch", logger: logger) { + $0.dispatched(job: .init(id: id.string, queueName: self.queueName.string, jobData: storage), eventLoop: eventLoop) + } + } + } +} + +extension Queue { + func sendNotification( + of kind: String, logger: Logger, + _ notification: @escaping @Sendable (_ hook: any JobEventDelegate) -> EventLoopFuture + ) -> EventLoopFuture { + logger.trace("Sending notification", metadata: ["kind": "\(kind)"]) + return self.configuration.notificationHooks.map { + notification($0).flatMapErrorWithEventLoop { error, eventLoop in + logger.warning("Failed to send notification", metadata: ["kind": "\(kind)", "error": "\(error)"]) + return eventLoop.makeSucceededVoidFuture() + } + }.flatten(on: self.eventLoop) + } - return self.configuration.notificationHooks.map { - $0.dispatched(job: .init(id: id.string, queueName: self.queueName.string, jobData: storage), eventLoop: self.eventLoop) - }.flatten(on: self.eventLoop).flatMapError { error in - self.logger.error("Could not send dispatched notification: \(error)") - return self.eventLoop.future() + func sendNotification( + of kind: String, logger: Logger, + _ notification: @escaping @Sendable (_ hook: any JobEventDelegate) async throws -> Void + ) async { + logger.trace("Sending notification", metadata: ["kind": "\(kind)"]) + for hook in self.configuration.notificationHooks { + do { + try await notification(hook) + } catch { + logger.warning("Failed to send notification", metadata: ["kind": "\(kind)", "error": "\(error)"]) } } } diff --git a/Sources/Queues/QueueContext.swift b/Sources/Queues/QueueContext.swift index 6ef924e..bad1db9 100644 --- a/Sources/Queues/QueueContext.swift +++ b/Sources/Queues/QueueContext.swift @@ -3,7 +3,7 @@ import NIOCore import Vapor /// The context for a queue. -public struct QueueContext { +public struct QueueContext: Sendable { /// The name of the queue public let queueName: QueueName @@ -17,7 +17,7 @@ public struct QueueContext { public var logger: Logger /// An event loop to run the process on - public let eventLoop: EventLoop + public let eventLoop: any EventLoop /// Creates a new JobContext /// - Parameters: @@ -31,7 +31,7 @@ public struct QueueContext { configuration: QueuesConfiguration, application: Application, logger: Logger, - on eventLoop: EventLoop + on eventLoop: any EventLoop ) { self.queueName = queueName self.configuration = configuration @@ -41,13 +41,13 @@ public struct QueueContext { } /// Returns the default job `Queue` - public var queue: Queue { + public var queue: any Queue { self.queues(.default) } /// Returns the specific job `Queue` for the given queue name /// - Parameter queue: The queue name - public func queues(_ queue: QueueName) -> Queue { + public func queues(_ queue: QueueName) -> any Queue { self.application.queues.queue( queue, logger: self.logger, diff --git a/Sources/Queues/QueueName.swift b/Sources/Queues/QueueName.swift index a7bfe8b..1941666 100644 --- a/Sources/Queues/QueueName.swift +++ b/Sources/Queues/QueueName.swift @@ -1,23 +1,23 @@ /// A specific queue that jobs are run on. -public struct QueueName { +public struct QueueName: Sendable { /// The default queue that jobs are run on - public static let `default` = QueueName(string: "default") + public static let `default` = Self(string: "default") /// The name of the queue public let string: String - /// Creates a new `QueueType` + /// Creates a new ``QueueName`` /// - /// - Parameter name: The name of the `QueueType` + /// - Parameter name: The name of the queue public init(string: String) { self.string = string } /// Makes the name of the queue /// - /// - Parameter persistanceKey: The base persistence key + /// - Parameter persistenceKey: The base persistence key /// - Returns: A string of the queue's fully qualified name - public func makeKey(with persistanceKey: String) -> String { - return persistanceKey + "[\(self.string)]" + public func makeKey(with persistenceKey: String) -> String { + "\(persistenceKey)[\(self.string)]" } } diff --git a/Sources/Queues/QueueWorker.swift b/Sources/Queues/QueueWorker.swift index b375188..fe76246 100644 --- a/Sources/Queues/QueueWorker.swift +++ b/Sources/Queues/QueueWorker.swift @@ -8,182 +8,94 @@ extension Queue { } } -/// The worker that runs the `Job` -public struct QueueWorker { - let queue: Queue +/// The worker that runs ``Job``s. +public struct QueueWorker: Sendable { + let queue: any Queue - init(queue: Queue) { - self.queue = queue + /// Actually run the queue. This is a thin wrapper for ELF-style callers. + public func run() -> EventLoopFuture { + self.queue.eventLoop.makeFutureWithTask { + try await self.run() + } } - /// Logic to run the queue - public func run() -> EventLoopFuture { - queue.logger.trace("Popping job from queue") - return self.queue.pop().flatMap { id in - //No job found, go to the next iteration - guard let id = id else { - self.queue.logger.trace("Did not receive ID from pop") - return self.queue.eventLoop.makeSucceededFuture(()) - } - - self.queue.logger.trace("Received job \(id)") - self.queue.logger.trace("Getting data for job \(id)") - - return self.queue.get(id).flatMap { data in - var logger = self.queue.logger - logger[metadataKey: "job_id"] = .string(id.string) - - logger.trace("Received job data for \(id): \(data)") - // If the job has a delay, we must check to make sure we can execute. - // If the delay has not passed yet, requeue the job - if let delay = data.delayUntil, delay >= Date() { - logger.trace("Requeueing job \(id) for execution later because the delayUntil value of \(delay) has not passed yet") - return self.queue.push(id) - } + /// Pop a job off the queue and try to run it. If no jobs are available, do nothing. + public func run() async throws { + var logger = self.queue.logger + logger[metadataKey: "queue"] = "\(self.queue.queueName.string)" + logger.trace("Popping job from queue") + + guard let id = try await self.queue.pop().get() else { + // No job found, go around again. + return logger.trace("No pending jobs") + } - guard let job = self.queue.configuration.jobs[data.jobName] else { - logger.error("No job named \(data.jobName) is registered") - return self.queue.eventLoop.makeSucceededFuture(()) - } + logger[metadataKey: "job-id"] = "\(id.string)" + logger.trace("Found pending job") + + let data = try await self.queue.get(id).get() + logger.trace("Received job data", metadata: ["job-data": "\(data)"]) + logger[metadataKey: "job-name"] = "\(data.jobName)" - logger.trace("Sending dequeued notification hooks") + guard let job = self.queue.configuration.jobs[data.jobName] else { + logger.warning("No job with the desired name is registered, discarding") + return try await self.queue.clear(id).get() + } - return self.queue.configuration.notificationHooks.map { - $0.didDequeue(jobId: id.string, eventLoop: self.queue.eventLoop) - }.flatten(on: self.queue.eventLoop).flatMapError { error in - logger.error("Could not send didDequeue notification: \(error)") - return self.queue.eventLoop.future() - }.flatMap { _ in - logger.info("Dequeueing job", metadata: [ - "job_id": .string(id.string), - "job_name": .string(data.jobName), - "queue": .string(self.queue.queueName.string) - ]) + // If the job has a delay that isn't up yet, requeue it. + guard (data.delayUntil ?? .distantPast) < Date() else { + logger.trace("Job is delayed, requeueing for later execution", metadata: ["delayed-until": "\(data.delayUntil ?? .distantPast)"]) + return try await self.queue.push(id).get() + } - return self.run( - id: id, - name: data.jobName, - job: job, - payload: data.payload, - logger: logger, - remainingTries: data.maxRetryCount, - attempts: data.attempts, - jobData: data - ) - } - } + await self.queue.sendNotification(of: "dequeue", logger: logger) { + try await $0.didDequeue(jobId: id.string, eventLoop: self.queue.eventLoop).get() } + + try await self.run(id: id, job: job, jobData: data, logger: logger) } - private func run( - id: JobIdentifier, - name: String, - job: AnyJob, - payload: [UInt8], - logger: Logger, - remainingTries: Int, - attempts: Int?, - jobData: JobData - ) -> EventLoopFuture { - logger.trace("Running the queue job (remaining tries: \(remainingTries)") - let futureJob = job._dequeue(self.queue.context, id: id.string, payload: payload) - return futureJob.flatMap { complete in - logger.trace("Ran job successfully") - logger.trace("Sending success notification hooks") - return self.queue.configuration.notificationHooks.map { - $0.success(jobId: id.string, eventLoop: self.queue.context.eventLoop) - }.flatten(on: self.queue.context.eventLoop).flatMapError { error in - self.queue.logger.error("Could not send success notification: \(error)") - return self.queue.context.eventLoop.future() - }.flatMap { - logger.trace("Job done being run") - return self.queue.clear(id) + private func run(id: JobIdentifier, job: any AnyJob, jobData: JobData, logger: Logger) async throws { + logger.info("Dequeing and running job", metadata: ["attempt": "\(jobData.currentAttempt)", "retries-left": "\(jobData.remainingAttempts)"]) + do { + try await job._dequeue(self.queue.context, id: id.string, payload: jobData.payload).get() + + logger.trace("Job ran successfully", metadata: ["attempts-made": "\(jobData.currentAttempt)"]) + await self.queue.sendNotification(of: "success", logger: logger) { + try await $0.success(jobId: id.string, eventLoop: self.queue.context.eventLoop).get() } - }.flatMapError { error in - logger.trace("Job failed (remaining tries: \(remainingTries)") - if remainingTries == 0 { - logger.error("Job failed with error: \(error)", metadata: [ - "job_id": .string(id.string), - "job_name": .string(name), - "queue": .string(self.queue.queueName.string) - ]) + } catch { + if jobData.remainingAttempts > 0 { + // N.B.: `return` from here so we don't clear the job data. + return try await self.retry(id: id, job: job, jobData: jobData, error: error, logger: logger) + } else { + logger.warning("Job failed, no retries remaining", metadata: ["error": "\(error)", "attempts-made": "\(jobData.currentAttempt)"]) - logger.trace("Sending failure notification hooks") - return job._error(self.queue.context, id: id.string, error, payload: payload).flatMap { _ in - return self.queue.configuration.notificationHooks.map { - $0.error(jobId: id.string, error: error, eventLoop: self.queue.context.eventLoop) - }.flatten(on: self.queue.context.eventLoop).flatMapError { error in - self.queue.logger.error("Failed to send error notification: \(error)") - return self.queue.context.eventLoop.future() - }.flatMap { - logger.trace("Job done being run") - return self.queue.clear(id) - } + try await job._error(self.queue.context, id: id.string, error, payload: jobData.payload).get() + await self.queue.sendNotification(of: "failure", logger: logger) { + try await $0.error(jobId: id.string, error: error, eventLoop: self.queue.context.eventLoop).get() } - } else { - return self.retry( - id: id, - name: name, - job: job, - payload: payload, - logger: logger, - remainingTries: remainingTries, - attempts: attempts, - jobData: jobData, - error: error - ) } } + try await self.queue.clear(id).get() } - private func retry( - id: JobIdentifier, - name: String, - job: AnyJob, - payload: [UInt8], - logger: Logger, - remainingTries: Int, - attempts: Int?, - jobData: JobData, - error: Error - ) -> EventLoopFuture { - let attempts = attempts ?? 0 - let delayInSeconds = job._nextRetryIn(attempt: attempts + 1) - if delayInSeconds == -1 { - logger.error("Job failed, retrying... \(error)", metadata: [ - "job_id": .string(id.string), - "job_name": .string(name), - "queue": .string(self.queue.queueName.string) - ]) - return self.run( - id: id, - name: name, - job: job, - payload: payload, - logger: logger, - remainingTries: remainingTries - 1, - attempts: attempts + 1, - jobData: jobData - ) - } else { - logger.error("Job failed, retrying in \(delayInSeconds)s... \(error)", metadata: [ - "job_id": .string(id.string), - "job_name": .string(name), - "queue": .string(self.queue.queueName.string) - ]) - let storage = JobData( - payload: jobData.payload, - maxRetryCount: remainingTries - 1, - jobName: jobData.jobName, - delayUntil: Date(timeIntervalSinceNow: Double(delayInSeconds)), - queuedAt: jobData.queuedAt, - attempts: attempts + 1 - ) - return self.queue.clear(id).flatMap { - self.queue.set(id, to: storage) - }.flatMap { - self.queue.push(id) - } - } + private func retry(id: JobIdentifier, job: any AnyJob, jobData: JobData, error: any Error, logger: Logger) async throws { + let delay = Swift.max(0, job._nextRetryIn(attempt: jobData.currentAttempt)) + let updatedData = JobData( + payload: jobData.payload, + maxRetryCount: jobData.maxRetryCount, + jobName: jobData.jobName, + delayUntil: .init(timeIntervalSinceNow: Double(delay)), + queuedAt: .init(), + attempts: jobData.currentAttempt + ) + + logger.warning("Job failed, retrying", metadata: [ + "retry-delay": "\(delay)", "error": "\(error)", "next-attempt": "\(updatedData.currentAttempt)", "retries-left": "\(updatedData.remainingAttempts)" + ]) + try await self.queue.clear(id).get() + try await self.queue.set(id, to: updatedData).get() + try await self.queue.push(id).get() } } diff --git a/Sources/Queues/QueuesCommand.swift b/Sources/Queues/QueuesCommand.swift index cd2cada..a242d35 100644 --- a/Sources/Queues/QueuesCommand.swift +++ b/Sources/Queues/QueuesCommand.swift @@ -1,23 +1,18 @@ import ConsoleKit -import Dispatch +@preconcurrency import Dispatch import Vapor import NIOConcurrencyHelpers import NIOCore import Atomics -#if os(Linux) -import Glibc -#else -import Darwin.C -#endif /// The command to start the Queue job -public final class QueuesCommand: Command { - /// See `Command.signature` +public final class QueuesCommand: AsyncCommand, Sendable { + // See `Command.signature`. public let signature = Signature() - /// See `Command.Signature` + // See `Command.Signature`. public struct Signature: CommandSignature { - public init() { } + public init() {} @Option(name: "queue", help: "Specifies a single queue to run") var queue: String? @@ -26,57 +21,51 @@ public final class QueuesCommand: Command { var scheduled: Bool } - /// See `Command.help` - public var help: String { - return "Starts the Vapor Queues worker" - } + // See `Command.help`. + public var help: String { "Starts the Vapor Queues worker" } private let application: Application - private var jobTasks: [RepeatedTask] - private var scheduledTasks: [String: AnyScheduledJob.Task] - private var lock: NIOLock - private var signalSources: [DispatchSourceSignal] - private var didShutdown: Bool - private let isShuttingDown: ManagedAtomic + private let box: NIOLockedValueBox - private var eventLoopGroup: EventLoopGroup { - self.application.eventLoopGroup + struct Box: Sendable { + var jobTasks: [RepeatedTask] + var scheduledTasks: [String: AnyScheduledJob.Task] + var signalSources: [any DispatchSourceSignal] + var didShutdown: Bool } - - /// Create a new `QueueCommand` + + /// Create a new ``QueuesCommand``. + /// + /// - Parameters: + /// - application: The active Vapor `Application`. + /// - scheduled: This parameter is a historical artifact and has no effect. public init(application: Application, scheduled: Bool = false) { self.application = application - self.jobTasks = [] - self.scheduledTasks = [:] - self.isShuttingDown = .init(false) - self.signalSources = [] - self.didShutdown = false - self.lock = .init() + self.box = .init(.init(jobTasks: [], scheduledTasks: [:], signalSources: [], didShutdown: false)) } - /// Runs the command - /// - Parameters: - /// - context: A `CommandContext` for the command to run on - /// - signature: The signature of the command - public func run(using context: CommandContext, signature: QueuesCommand.Signature) throws { - self.application.logger.trace("Begginning the run function") - + // See `AsyncCommand.run(using:signature:)`. + public func run(using context: CommandContext, signature: QueuesCommand.Signature) async throws { // shutdown future - let promise = self.application.eventLoopGroup.next().makePromise(of: Void.self) + let promise = self.application.eventLoopGroup.any().makePromise(of: Void.self) self.application.running = .start(using: promise) // setup signal sources for shutdown let signalQueue = DispatchQueue(label: "codes.vapor.jobs.command") func makeSignalSource(_ code: Int32) { + #if canImport(Darwin) + /// https://github.com/swift-server/swift-service-lifecycle/blob/main/Sources/UnixSignals/UnixSignalsSequence.swift#L77-L82 + signal(code, SIG_IGN) + #endif + let source = DispatchSource.makeSignalSource(signal: code, queue: signalQueue) source.setEventHandler { print() // clear ^C promise.succeed(()) } source.resume() - self.signalSources.append(source) - signal(code, SIG_IGN) + self.box.withLockedValue { $0.signalSources.append(source) } } makeSignalSource(SIGTERM) makeSignalSource(SIGINT) @@ -85,55 +74,53 @@ public final class QueuesCommand: Command { self.application.logger.info("Starting scheduled jobs worker") try self.startScheduledJobs() } else { - let queue: QueueName = signature.queue - .flatMap { .init(string: $0) } ?? .default + let queue: QueueName = signature.queue.map { .init(string: $0) } ?? .default + self.application.logger.info("Starting jobs worker", metadata: ["queue": .string(queue.string)]) try self.startJobs(on: queue) } } /// Starts an in-process jobs worker for queued tasks + /// /// - Parameter queueName: The queue to run the jobs on public func startJobs(on queueName: QueueName) throws { let workerCount: Int switch self.application.queues.configuration.workerCount { case .default: - var count = 0 - for _ in self.eventLoopGroup.makeIterator() { - count += 1 - } - workerCount = count - self.application.logger.trace("Default workerCount, setting to \(workerCount)") + workerCount = self.application.eventLoopGroup.makeIterator().reduce(0, { n, _ in n + 1 }) + self.application.logger.trace("Using default worker count", metadata: ["workerCount": "\(workerCount)"]) case .custom(let custom): workerCount = custom - self.application.logger.trace("Custom workerCount, setting to \(workerCount)") + self.application.logger.trace("Using custom worker count", metadata: ["workerCount": "\(workerCount)"]) } - for i in 0.. = .init(.init()) + /// The number of seconds to wait before checking for the next job. Defaults to `1` - public var refreshInterval: TimeAmount + public var refreshInterval: TimeAmount { + get { self.dataBox.withLockedValue { $0.refreshInterval } } + set { self.dataBox.withLockedValue { $0.refreshInterval = newValue } } + } /// The key that stores the data about a job. Defaults to `vapor_queues` - public var persistenceKey: String + public var persistenceKey: String { + get { self.dataBox.withLockedValue { $0.persistenceKey } } + set { self.dataBox.withLockedValue { $0.persistenceKey = newValue } } + } /// Supported options for number of job handling workers. - public enum WorkerCount: ExpressibleByIntegerLiteral { + public enum WorkerCount: ExpressibleByIntegerLiteral, Sendable { /// One worker per event loop. case `default` @@ -24,50 +45,61 @@ public struct QueuesConfiguration { } /// Sets the number of workers used for handling jobs. - public var workerCount: WorkerCount + public var workerCount: WorkerCount { + get { self.dataBox.withLockedValue { $0.workerCount } } + set { self.dataBox.withLockedValue { $0.workerCount = newValue } } + } /// A logger public let logger: Logger // Arbitrary user info to be stored - public var userInfo: [AnyHashable: Any] + public var userInfo: [AnySendableHashable: any Sendable] { + get { self.dataBox.withLockedValue { $0.userInfo } } + set { self.dataBox.withLockedValue { $0.userInfo = newValue } } + } - var jobs: [String: AnyJob] - var scheduledJobs: [AnyScheduledJob] - var notificationHooks: [JobEventDelegate] + var jobs: [String: any AnyJob] { + get { self.dataBox.withLockedValue { $0.jobs } } + set { self.dataBox.withLockedValue { $0.jobs = newValue } } + } - /// Creates an empty `JobsConfig` + var scheduledJobs: [AnyScheduledJob] { + get { self.dataBox.withLockedValue { $0.scheduledJobs } } + set { self.dataBox.withLockedValue { $0.scheduledJobs = newValue } } + } + + var notificationHooks: [any JobEventDelegate] { + get { self.dataBox.withLockedValue { $0.notificationHooks } } + set { self.dataBox.withLockedValue { $0.notificationHooks = newValue } } + } + + /// Creates an empty ``QueuesConfiguration``. public init( refreshInterval: TimeAmount = .seconds(1), persistenceKey: String = "vapor_queues", workerCount: WorkerCount = .default, logger: Logger = .init(label: "codes.vapor.queues") ) { - self.jobs = [:] - self.scheduledJobs = [] self.logger = logger self.refreshInterval = refreshInterval self.persistenceKey = persistenceKey self.workerCount = workerCount - self.userInfo = [:] - self.notificationHooks = [] } - /// Adds a new `Job` to the queue configuration. - /// This must be called on all `Job` objects before they can be run in a queue. + /// Adds a new ``Job`` to the queue configuration. /// - /// - Parameter job: The `Job` to add. - mutating public func add(_ job: J) - where J: Job - { - self.logger.trace("Adding job type: \(J.name)") + /// This must be called on all ``Job`` objects before they can be run in a queue. + /// + /// - Parameter job: The ``Job`` to add. + mutating public func add(_ job: J) { + self.logger.trace("Adding job type", metadata: ["name": "\(J.name)"]) if let existing = self.jobs[J.name] { - self.logger.warning("A job is already registered with key \(J.name): \(existing)") + self.logger.warning("Job type is already registered", metadata: ["name": "\(J.name)", "existing": "\(existing)"]) } self.jobs[J.name] = job } - /// Schedules a new job for execution at a later date. /// /// config.schedule(Cleanup()) @@ -76,21 +108,20 @@ public struct QueuesConfiguration { /// .on(23) /// .at(.noon) /// - /// - Parameter job: The `ScheduledJob` to be scheduled. - mutating internal func schedule(_ job: J, builder: ScheduleBuilder = ScheduleBuilder()) -> ScheduleBuilder - where J: ScheduledJob - { - self.logger.trace("Scheduling \(job.name)") - let storage = AnyScheduledJob(job: job, scheduler: builder) - self.scheduledJobs.append(storage) + /// - Parameters: + /// - job: The ``ScheduledJob`` to schedule. + /// - builder: A ``ScheduleBuilder`` to use for scheduling. + /// - Returns: The passed-in ``ScheduleBuilder``. + mutating func schedule(_ job: some ScheduledJob, builder: ScheduleBuilder = .init()) -> ScheduleBuilder { + self.logger.trace("Scheduling job", metadata: ["job-name": "\(job.name)"]) + self.scheduledJobs.append(AnyScheduledJob(job: job, scheduler: builder)) return builder } /// Adds a notification hook that can receive status updates about jobs - /// - Parameter hook: The `NotificationHook` object - mutating public func add(_ hook: N) - where N: JobEventDelegate - { + /// + /// - Parameter hook: A ``JobEventDelegate`` to register. + mutating public func add(_ hook: some JobEventDelegate) { self.logger.trace("Adding notification hook") self.notificationHooks.append(hook) } diff --git a/Sources/Queues/QueuesDriver.swift b/Sources/Queues/QueuesDriver.swift index 02452d7..2a60ad0 100644 --- a/Sources/Queues/QueuesDriver.swift +++ b/Sources/Queues/QueuesDriver.swift @@ -1,18 +1,19 @@ /// A new driver for Queues -public protocol QueuesDriver { - /// Makes the queue worker - /// - Parameter context: The context of the job - func makeQueue(with context: QueueContext) -> Queue +public protocol QueuesDriver: Sendable { + /// Create or look up a named ``Queue`` instance. + /// + /// - Parameter context: The context for jobs on the queue. Also provides the queue name. + func makeQueue(with context: QueueContext) -> any Queue /// Shuts down the driver func shutdown() - /// Shut down the driver asyncrhonously. Helps avoid calling `.wait()` + /// Shut down the driver asynchronously. Helps avoid calling `.wait()` func asyncShutdown() async } extension QueuesDriver { public func asyncShutdown() async { - shutdown() + self.shutdown() } } diff --git a/Sources/Queues/QueuesEventLoopPreference.swift b/Sources/Queues/QueuesEventLoopPreference.swift index aa9e7fe..55307e6 100644 --- a/Sources/Queues/QueuesEventLoopPreference.swift +++ b/Sources/Queues/QueuesEventLoopPreference.swift @@ -1,11 +1,3 @@ -// -// JobsEventLoopPreference.swift -// -// -// Created by Jimmy McDermott on 10/24/19. -// - -import Foundation import NIOCore /// Determines which event loop the jobs worker uses while executing jobs. @@ -17,13 +9,13 @@ public enum QueuesEventLoopPreference { /// called back (delegated to) on the supplied EventLoop. /// If possible, the connection should also be on this EventLoop for /// improved performance. - case delegate(on: EventLoop) + case delegate(on: any EventLoop) /// Returns the delegate EventLoop given an EventLoopGroup. - public func delegate(for eventLoopGroup: EventLoopGroup) -> EventLoop { + public func delegate(for eventLoopGroup: any EventLoopGroup) -> any EventLoop { switch self { case .indifferent: - return eventLoopGroup.next() + return eventLoopGroup.any() case .delegate(let eventLoop): return eventLoop } diff --git a/Sources/Queues/RepeatedTask+Cancel.swift b/Sources/Queues/RepeatedTask+Cancel.swift index 841fa0c..db91990 100644 --- a/Sources/Queues/RepeatedTask+Cancel.swift +++ b/Sources/Queues/RepeatedTask+Cancel.swift @@ -1,23 +1,24 @@ import NIOCore +import Logging extension RepeatedTask { - func syncCancel(on eventLoop: EventLoop) { + func syncCancel(on eventLoop: any EventLoop) { do { let promise = eventLoop.makePromise(of: Void.self) self.cancel(promise: promise) try promise.futureResult.wait() } catch { - print("failed cancelling repeated task \(error)") + Logger(label: "codes.vapor.queues.repeatedtask").debug("Failed cancelling repeated task", metadata: ["error": "\(error)"]) } } - func asyncCancel(on eventLoop: EventLoop) async { + func asyncCancel(on eventLoop: any EventLoop) async { do { let promise = eventLoop.makePromise(of: Void.self) self.cancel(promise: promise) try await promise.futureResult.get() } catch { - print("failed cancelling repeated task \(error)") + Logger(label: "codes.vapor.queues.repeatedtask").debug("Failed cancelling repeated task", metadata: ["error": "\(error)"]) } } } diff --git a/Sources/Queues/Request+Queues.swift b/Sources/Queues/Request+Queues.swift index 6a1e6f4..1327cc7 100644 --- a/Sources/Queues/Request+Queues.swift +++ b/Sources/Queues/Request+Queues.swift @@ -3,18 +3,15 @@ import Vapor import NIOCore extension Request { - /// Returns the default job `Queue` - public var queue: Queue { + /// Get the default ``Queue``. + public var queue: any Queue { self.queues(.default) } - /// Returns the specific job `Queue` for the given queue name + /// Create or look up an instance of a named ``Queue`` and bind it to this request's event loop. + /// /// - Parameter queue: The queue name - public func queues(_ queue: QueueName) -> Queue { - self.application.queues.queue( - queue, - logger: self.logger, - on: self.eventLoop - ) + public func queues(_ queue: QueueName, logger: Logger? = nil) -> any Queue { + self.application.queues.queue(queue, logger: logger ?? self.logger, on: self.eventLoop) } } diff --git a/Sources/Queues/ScheduleBuilder.swift b/Sources/Queues/ScheduleBuilder.swift index 9a6a2b4..e42f58a 100644 --- a/Sources/Queues/ScheduleBuilder.swift +++ b/Sources/Queues/ScheduleBuilder.swift @@ -1,22 +1,21 @@ import Foundation /// An object that can be used to build a scheduled job -public final class ScheduleBuilder { - +public final class ScheduleBuilder: @unchecked Sendable { /// Months of the year public enum Month: Int { case january = 1 - case february = 2 - case march = 3 - case april = 4 - case may = 5 - case june = 6 - case july = 7 - case august = 8 - case september = 9 - case october = 10 - case november = 11 - case december = 12 + case february + case march + case april + case may + case june + case july + case august + case september + case october + case november + case december } /// Describes a day @@ -25,91 +24,50 @@ public final class ScheduleBuilder { case last case exact(Int) - public init(integerLiteral value: Int) { - self = .exact(value) - } + public init(integerLiteral value: Int) { self = .exact(value) } } /// Describes a day of the week public enum Weekday: Int { case sunday = 1 - case monday = 2 - case tuesday = 3 - case wednesday = 4 - case thursday = 5 - case friday = 6 - case saturday = 7 + case monday + case tuesday + case wednesday + case thursday + case friday + case saturday } /// Describes a time of day public struct Time: ExpressibleByStringLiteral, CustomStringConvertible { - var hour: Hour24 - var minute: Minute - /// Returns a `Time` object at midnight (12:00 AM) - public static var midnight: Time { - return .init(12, 00, .am) - } + public static var midnight: Time { .init(12, 00, .am) } /// Returns a `Time` object at noon (12:00 PM) - public static var noon: Time { - return .init(12, 00, .pm) - } - - /// The readable description of the time - public var description: String { - return "\(self.hour):\(self.minute)" - } + public static var noon: Time { .init(12, 00, .pm) } - init(_ hour: Hour24, _ minute: Minute) { - self.hour = hour - self.minute = minute - } + var hour: Hour24, minute: Minute + init(_ hour: Hour24, _ minute: Minute) { (self.hour, self.minute) = (hour, minute) } + init(_ hour: Hour12, _ minute: Minute, _ period: HourPeriod) { - switch period { - case .am: - if hour.number == 12 && minute.number == 0 { - self.hour = .init(0) - } else { - self.hour = .init(hour.number) - } - case .pm: - if hour.number == 12 { - self.hour = .init(12) - } else { - self.hour = .init(hour.number + 12) - } - } - self.minute = minute + self.init(.init(hour.n % 12 + (period == .am ? 0 : 12)), minute) } - - /// Takes a stringLiteral and returns a `TimeObject`. Must be in the format `00:00am/pm` + public init(stringLiteral value: String) { - let parts = value.split(separator: ":") + let parts = value.split(separator: ":", maxSplits: 1) + + guard let hour = Int(parts[0]) else { fatalError("Could not convert hour to Int") } switch parts.count { case 1: - guard let hour = Int(parts[0]) else { - fatalError("Could not convert hour to Int") - } self.init(Hour24(hour), 0) case 2: - guard let hour = Int(parts[0]) else { - fatalError("Could not convert hour to Int") - } + guard let minute = Int(parts[1].prefix(2)) else { fatalError("Could not convert minute to Int") } switch parts[1].count { case 2: - guard let minute = Int(parts[1]) else { - fatalError("Could not convert minute to Int") - } self.init(Hour24(hour), Minute(minute)) case 4: - let s = parts[1] - guard let minute = Int(s[s.startIndex.. 0, "12-hour clock cannot preceed 1") - precondition(number <= 12, "12-hour clock cannot exceed 12") - self.number = number - } - - /// Takes an integerLiteral and creates a `Hour12`. Must be `> 0 && <= 12` - public init(integerLiteral value: Int) { - self.init(value) - } + let n: Int + + init(_ n: Int) { precondition((1 ... 12).contains(n), "12-hour clock must be in range 1-12"); self.n = n } + + public init(integerLiteral value: Int) { self.init(value) } + + public var description: String { "\(self.n)" } } /// Represents an hour numeral that must be in 24 hour format public struct Hour24: ExpressibleByIntegerLiteral, CustomStringConvertible { - let number: Int - - /// The readable description of the hour, zero padding included - public var description: String { - switch self.number { - case 0..<10: - return "0" + self.number.description - default: - return self.number.description - } - } - - init(_ number: Int) { - precondition(number >= 0, "24-hour clock cannot preceed 0") - precondition(number < 24, "24-hour clock cannot exceed 24") - self.number = number - } - - /// Takes an integerLiteral and creates a `Hour24`. Must be `>= 0 && < 24` - public init(integerLiteral value: Int) { - self.init(value) - } + let n: Int + + init(_ n: Int) { precondition((0 ..< 24).contains(n), "24-hour clock must be in range 0-23"); self.n = n } + + public init(integerLiteral value: Int) { self.init(value) } + + public var description: String { String("0\(self.n)".suffix(2)) } } /// A period of hours - either `am` or `pm` - public enum HourPeriod: ExpressibleByStringLiteral, CustomStringConvertible { - case am - case pm - - /// The readable string - public var description: String { - switch self { - case .am: - return "am" - case .pm: - return "pm" - } - } - - init(_ string: String) { - switch string.lowercased() { - case "am": - self = .am - case "pm": - self = .pm - default: - fatalError("Unknown hour period: \(string), must be am or pm") - } - } - - /// Takes a stringLiteral and creates a `HourPeriod.` Must be `am` or `pm` - public init(stringLiteral value: String) { - self.init(value) - } + public enum HourPeriod: String, ExpressibleByStringLiteral, CustomStringConvertible, Hashable { + case am, pm + + init(_ string: String) { self.init(rawValue: string)! } + + public init(stringLiteral value: String) { self.init(value) } + + public var description: String { self.rawValue } } /// Describes a minute numeral public struct Minute: ExpressibleByIntegerLiteral, CustomStringConvertible { - let number: Int - - /// The readable minute, zero padded. - public var description: String { - switch self.number { - case 0..<10: - return "0" + self.number.description - default: - return self.number.description - } - } - - init(_ number: Int) { - assert(number >= 0, "Minute cannot preceed 0") - assert(number < 60, "Minute cannot exceed 60") - self.number = number - } - - /// Takes an integerLiteral and creates a `Minute`. Must be `>= 0 && < 60` - public init(integerLiteral value: Int) { - self.init(value) - } + let n: Int + + init(_ n: Int) { precondition((0 ..< 60).contains(n), "Minute must be in range 0-59"); self.n = n } + + public init(integerLiteral value: Int) { self.init(value) } + + public var description: String { String("0\(self.n)".suffix(2)) } } /// Describes a second numeral public struct Second: ExpressibleByIntegerLiteral, CustomStringConvertible { - let number: Int + let n: Int - /// The readable second, zero padded. - public var description: String { - switch self.number { - case 0..<10: - return "0" + self.number.description - default: - return self.number.description - } - } + init(_ n: Int) { precondition((0 ..< 60).contains(n), "Second must be in range 0-59"); self.n = n } - init(_ number: Int) { - assert(number >= 0, "Second cannot preceed 0") - assert(number < 60, "Second cannot exceed 60") - self.number = number - } + public init(integerLiteral value: Int) { self.init(value) } - /// Takes an integerLiteral and creates a `Second`. Must be `>= 0 && < 60` - public init(integerLiteral value: Int) { - self.init(value) - } + public var description: String { String("0\(self.n)".suffix(2)) } } - // MARK: Builders - /// An object to build a `Yearly` scheduled job public struct Yearly { let builder: ScheduleBuilder - /// The month to run the job in - /// - Parameter month: A `Month` to run the job in - public func `in`(_ month: Month) -> Monthly { - self.builder.month = month - return self.builder.monthly() - } + public func `in`(_ month: Month) -> Monthly { self.builder.month = month; return self.builder.monthly() } } /// An object to build a `Monthly` scheduled job public struct Monthly { let builder: ScheduleBuilder - /// The day to run the job on - /// - Parameter day: A `Day` to run the job on - public func on(_ day: Day) -> Daily { - self.builder.day = day - return self.builder.daily() - } + public func on(_ day: Day) -> Daily { self.builder.day = day; return self.builder.daily() } } /// An object to build a `Weekly` scheduled job public struct Weekly { let builder: ScheduleBuilder - /// The day of week to run the job on - /// - Parameter dayOfWeek: A `DayOfWeek` to run the job on - public func on(_ weekday: Weekday) -> Daily { - self.builder.weekday = weekday - return self.builder.daily() - } + public func on(_ weekday: Weekday) -> Daily { self.builder.weekday = weekday; return self.builder.daily() } } /// An object to build a `Daily` scheduled job public struct Daily { let builder: ScheduleBuilder - /// The time to run the job at - /// - Parameter time: A `Time` object to run the job on - public func at(_ time: Time) { - self.builder.time = time - } + public func at(_ time: Time) { self.builder.time = time } - /// The 24 hour time to run the job at - /// - Parameter hour: A `Hour24` to run the job at - /// - Parameter minute: A `Minute` to run the job at - public func at(_ hour: Hour24, _ minute: Minute) { - self.at(.init(hour, minute)) - } + public func at(_ hour: Hour24, _ minute: Minute) { self.at(.init(hour, minute)) } - /// The 12 hour time to run the job at - /// - Parameter hour: A `Hour12` to run the job at - /// - Parameter minute: A `Minute` to run the job at - /// - Parameter period: A `HourPeriod` to run the job at (`am` or `pm`) - public func at(_ hour: Hour12, _ minute: Minute, _ period: HourPeriod) { - self.at(.init(hour, minute, period)) - } + public func at(_ hour: Hour12, _ minute: Minute, _ period: HourPeriod) { self.at(.init(hour, minute, period)) } } /// An object to build a `Hourly` scheduled job public struct Hourly { let builder: ScheduleBuilder - /// The minute to run the job at - /// - Parameter minute: A `Minute` to run the job at - public func at(_ minute: Minute) { - self.builder.minute = minute - } + public func at(_ minute: Minute) { self.builder.minute = minute } } /// An object to build a `EveryMinute` scheduled job public struct Minutely { let builder: ScheduleBuilder - - /// The second to run the job at - /// - Parameter second: A `Second` to run the job at - public func at(_ second: Second) { - self.builder.second = second - } + + public func at(_ second: Second) { self.builder.second = second } } - /// Retrieves the next date - /// - Parameter current: The current date - /// - Returns: The next date + /// Retrieves the next date after the one given. public func nextDate(current: Date = .init()) -> Date? { - if let date = self.date, date > current { - return date - } - + if let date = self.date, date > current { return date } + var components = DateComponents() - if let milliseconds = millisecond { - components.nanosecond = milliseconds - } - if let second = self.second { - components.second = second.number - } - if let minute = self.minute { - components.minute = minute.number - } - if let time = self.time { - components.minute = time.minute.number - components.hour = time.hour.number - } - if let weekday = self.weekday { - components.weekday = weekday.rawValue - } - if let day = self.day { - switch day { - case .first: - components.day = 1 - case .last: - fatalError("Last day of the month is not yet supported.") - case .exact(let exact): - components.day = exact - } - } - if let month = self.month { - components.month = month.rawValue - } - return calendar.nextDate( - after: current, - matching: components, - matchingPolicy: .strict - ) + components.nanosecond = self.millisecond.map { $0 * 1_000_000 } + components.second = self.second?.n + components.minute = self.time?.minute.n ?? self.minute?.n + components.hour = self.time?.hour.n + components.weekday = self.weekday?.rawValue + switch self.day { + case .first?: components.day = 1 + case .exact(let exact)?: components.day = exact + case .last?: fatalError("Last day of the month is not yet supported.") + default: break + } + components.month = self.month?.rawValue + return calendar.nextDate(after: current, matching: components, matchingPolicy: .strict) } - /// The caledar used to compute the next date + /// The calendar used to compute the next date var calendar: Calendar /// Date to perform task (one-off job) var date: Date? - var month: Month? - var day: Day? - var weekday: Weekday? - var time: Time? - var minute: Minute? - var second: Second? - var millisecond: Int? - - public init(calendar: Calendar = .current) { - self.calendar = calendar - } + var month: Month?, day: Day?, weekday: Weekday? + var time: Time?, minute: Minute?, second: Second?, millisecond: Int? + + public init(calendar: Calendar = .current) { self.calendar = calendar } - // MARK: Helpers - /// Schedules a job using a specific `Calendar` - public func using(_ calendar: Calendar) -> ScheduleBuilder { - self.calendar = calendar - return self - } - + public func using(_ calendar: Calendar) -> ScheduleBuilder { self.calendar = calendar; return self } + /// Schedules a job at a specific date - public func at(_ date: Date) { - self.date = date - } - + public func at(_ date: Date) { self.date = date } + /// Creates a yearly scheduled job for further building - public func yearly() -> Yearly { - return Yearly(builder: self) - } + @discardableResult public func yearly() -> Yearly { .init(builder: self) } /// Creates a monthly scheduled job for further building - public func monthly() -> Monthly { - return Monthly(builder: self) - } + @discardableResult public func monthly() -> Monthly { .init(builder: self) } /// Creates a weekly scheduled job for further building - public func weekly() -> Weekly { - return Weekly(builder: self) - } + @discardableResult public func weekly() -> Weekly { .init(builder: self) } /// Creates a daily scheduled job for further building - public func daily() -> Daily { - return Daily(builder: self) - } + @discardableResult public func daily() -> Daily { .init(builder: self) } /// Creates a hourly scheduled job for further building - public func hourly() -> Hourly { - return Hourly(builder: self) - } - + @discardableResult public func hourly() -> Hourly { .init(builder: self) } + /// Creates a minutely scheduled job for further building - @discardableResult - public func minutely() -> Minutely { - return Minutely(builder: self) - } - + @discardableResult public func minutely() -> Minutely { .init(builder: self) } + /// Runs a job every second - public func everySecond() { - self.millisecond = 0 - } + public func everySecond() { self.millisecond = 0 } } - diff --git a/Sources/Queues/ScheduledJob.swift b/Sources/Queues/ScheduledJob.swift index b1be2ce..6fde1fb 100644 --- a/Sources/Queues/ScheduledJob.swift +++ b/Sources/Queues/ScheduledJob.swift @@ -3,10 +3,12 @@ import Foundation import Logging /// Describes a job that can be scheduled and repeated -public protocol ScheduledJob { +public protocol ScheduledJob: Sendable { var name: String { get } - /// The method called when the job is run - /// - Parameter context: A `JobContext` that can be used + + /// The method called when the job is run. + /// + /// - Parameter context: The ``QueueContext``. func run(context: QueueContext) -> EventLoopFuture } @@ -14,37 +16,40 @@ extension ScheduledJob { public var name: String { "\(Self.self)" } } -class AnyScheduledJob { - let job: ScheduledJob +final class AnyScheduledJob: Sendable { + let job: any ScheduledJob let scheduler: ScheduleBuilder - init(job: ScheduledJob, scheduler: ScheduleBuilder) { + init(job: any ScheduledJob, scheduler: ScheduleBuilder) { self.job = job self.scheduler = scheduler } -} -extension AnyScheduledJob { - struct Task { + struct Task: Sendable { let task: RepeatedTask let done: EventLoopFuture } func schedule(context: QueueContext) -> Task? { - context.logger.trace("Beginning the scheduler process") + var logger_ = context.logger + logger_[metadataKey: "job-name"] = "\(self.job.name)" + let logger = logger_ + + logger.trace("Beginning the scheduler process") + guard let date = self.scheduler.nextDate() else { - context.logger.debug("No date scheduled for \(self.job.name)") + logger.debug("Scheduler returned no date") return nil } - context.logger.debug("Scheduling \(self.job.name) to run at \(date)") + logger.debug("Job scheduled", metadata: ["scheduled-date": "\(date)"]) + let promise = context.eventLoop.makePromise(of: Void.self) let task = context.eventLoop.scheduleRepeatedTask( - initialDelay: .microseconds(Int64(date.timeIntervalSinceNow * 1_000_000)), - delay: .seconds(0) + initialDelay: .microseconds(Int64(date.timeIntervalSinceNow * 1_000_000)), delay: .zero ) { task in // always cancel task.cancel() - context.logger.trace("Running the scheduled job \(self.job.name)") + logger.trace("Running scheduled job") self.job.run(context: context).cascade(to: promise) } return .init(task: task, done: promise.futureResult) diff --git a/Sources/XCTQueues/Docs.docc/Resources/vapor-queues-logo.svg b/Sources/XCTQueues/Docs.docc/Resources/vapor-queues-logo.svg new file mode 100644 index 0000000..f40f693 --- /dev/null +++ b/Sources/XCTQueues/Docs.docc/Resources/vapor-queues-logo.svg @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + + + + + diff --git a/Sources/XCTQueues/Docs.docc/theme-settings.json b/Sources/XCTQueues/Docs.docc/theme-settings.json new file mode 100644 index 0000000..1e0af2e --- /dev/null +++ b/Sources/XCTQueues/Docs.docc/theme-settings.json @@ -0,0 +1,21 @@ +{ + "theme": { + "aside": { "border-radius": "16px", "border-style": "double", "border-width": "3px" }, + "border-radius": "0", + "button": { "border-radius": "16px", "border-width": "1px", "border-style": "solid" }, + "code": { "border-radius": "16px", "border-width": "1px", "border-style": "solid" }, + "color": { + "queues": "#e8665a", + "documentation-intro-fill": "radial-gradient(circle at top, var(--color-queues) 30%, #000 100%)", + "documentation-intro-accent": "var(--color-queues)", + "logo-base": { "dark": "#fff", "light": "#000" }, + "logo-shape": { "dark": "#000", "light": "#fff" }, + "fill": { "dark": "#000", "light": "#fff" } + }, + "icons": { "technology": "/queues/images/vapor-queues-logo.svg" } + }, + "features": { + "quickNavigation": { "enable": true }, + "i18n": { "enable": true } + } +} diff --git a/Sources/XCTQueues/TestQueueDriver.swift b/Sources/XCTQueues/TestQueueDriver.swift index a4ed842..ace6885 100644 --- a/Sources/XCTQueues/TestQueueDriver.swift +++ b/Sources/XCTQueues/TestQueueDriver.swift @@ -10,17 +10,20 @@ extension Application.Queues.Provider { return $0.queues.use(custom: TestQueuesDriver()) } } + + public static var asyncTest: Self { + .init { + $0.queues.initializeAsyncTestStorage() + return $0.queues.use(custom: AsyncTestQueuesDriver()) + } + } } struct TestQueuesDriver: QueuesDriver { - let lock: NIOLock - - init() { - self.lock = .init() - } + init() {} - func makeQueue(with context: QueueContext) -> Queue { - TestQueue(lock: self.lock, context: context) + func makeQueue(with context: QueueContext) -> any Queue { + TestQueue(_context: .init(context)) } func shutdown() { @@ -28,50 +31,60 @@ struct TestQueuesDriver: QueuesDriver { } } +struct AsyncTestQueuesDriver: QueuesDriver { + init() {} + func makeQueue(with context: QueueContext) -> any Queue { AsyncTestQueue(_context: .init(context)) } + func shutdown() {} +} + extension Application.Queues { - public final class TestQueueStorage { - public var jobs: [JobIdentifier: JobData] = [:] - public var queue: [JobIdentifier] = [] - - /// Returns all jobs in the queue of the specific `J` type. - public func all(_ job: J.Type) -> [J.Payload] - where J: Job - { - let filteredJobIds = jobs.filter { $1.jobName == J.name }.map { $0.0 } + public final class TestQueueStorage: Sendable { + private struct Box: Sendable { + var jobs: [JobIdentifier: JobData] = [:] + var queue: [JobIdentifier] = [] + } + private let box = NIOLockedValueBox(.init()) - return queue + public var jobs: [JobIdentifier: JobData] { + get { self.box.withLockedValue { $0.jobs } } + set { self.box.withLockedValue { $0.jobs = newValue } } + } + + public var queue: [JobIdentifier] { + get { self.box.withLockedValue { $0.queue } } + set { self.box.withLockedValue { $0.queue = newValue } } + } + + /// Returns the payloads of all jobs in the queue having type `J`. + public func all(_ job: J.Type) -> [J.Payload] { + let filteredJobIds = self.jobs.filter { $1.jobName == J.name }.map { $0.0 } + + return self.queue .filter { filteredJobIds.contains($0) } .compactMap { jobs[$0] } .compactMap { try? J.parsePayload($0.payload) } } - /// Returns the first job in the queue of the specific `J` type. - public func first(_ job: J.Type) -> J.Payload? - where J: Job - { + /// Returns the payload of the first job in the queue having type `J`. + public func first(_ job: J.Type) -> J.Payload? { let filteredJobIds = jobs.filter { $1.jobName == J.name }.map { $0.0 } - guard - let queueJob = queue.first(where: { filteredJobIds.contains($0) }), - let jobData = jobs[queueJob] - else { - return nil - } + guard let queueJob = self.queue.first(where: { filteredJobIds.contains($0) }), let jobData = self.jobs[queueJob] else { + return nil + } return try? J.parsePayload(jobData.payload) } /// Checks whether a job of type `J` was dispatched to queue - public func contains(_ job: J.Type) -> Bool - where J: Job - { - return first(job) != nil + public func contains(_ job: J.Type) -> Bool { + self.first(job) != nil } } struct TestQueueKey: StorageKey, LockKey { typealias Value = TestQueueStorage } - + public var test: TestQueueStorage { self.application.storage[TestQueueKey.self]! } @@ -79,50 +92,66 @@ extension Application.Queues { func initializeTestStorage() { self.application.storage[TestQueueKey.self] = .init() } + + struct AsyncTestQueueKey: StorageKey, LockKey { + typealias Value = TestQueueStorage + } + + public var asyncTest: TestQueueStorage { + self.application.storage[AsyncTestQueueKey.self]! + } + + func initializeAsyncTestStorage() { + self.application.storage[AsyncTestQueueKey.self] = .init() + } } struct TestQueue: Queue { - let lock: NIOLock - let context: QueueContext + let _context: NIOLockedValueBox + var context: QueueContext { self._context.withLockedValue { $0 } } func get(_ id: JobIdentifier) -> EventLoopFuture { - self.lock.lock() - defer { self.lock.unlock() } - - return self.context.eventLoop.makeSucceededFuture( - self.context.application.queues.test.jobs[id]! - ) + self._context.withLockedValue { context in + context.eventLoop.makeSucceededFuture(context.application.queues.test.jobs[id]!) + } } func set(_ id: JobIdentifier, to data: JobData) -> EventLoopFuture { - self.lock.lock() - defer { self.lock.unlock() } - - self.context.application.queues.test.jobs[id] = data - return self.context.eventLoop.makeSucceededFuture(()) + self._context.withLockedValue { context in + context.application.queues.test.jobs[id] = data + return context.eventLoop.makeSucceededVoidFuture() + } } func clear(_ id: JobIdentifier) -> EventLoopFuture { - self.lock.lock() - defer { self.lock.unlock() } - - self.context.application.queues.test.jobs[id] = nil - return self.context.eventLoop.makeSucceededFuture(()) + self._context.withLockedValue { context in + context.application.queues.test.jobs[id] = nil + return context.eventLoop.makeSucceededVoidFuture() + } } func pop() -> EventLoopFuture { - self.lock.lock() - defer { self.lock.unlock() } - - let last = context.application.queues.test.queue.popLast() - return self.context.eventLoop.makeSucceededFuture(last) + self._context.withLockedValue { context in + let last = context.application.queues.test.queue.popLast() + return context.eventLoop.makeSucceededFuture(last) + } } func push(_ id: JobIdentifier) -> EventLoopFuture { - self.lock.lock() - defer { self.lock.unlock() } - - self.context.application.queues.test.queue.append(id) - return self.context.eventLoop.makeSucceededFuture(()) + self._context.withLockedValue { context in + context.application.queues.test.queue.append(id) + return context.eventLoop.makeSucceededVoidFuture() + } } } + +struct AsyncTestQueue: AsyncQueue { + let _context: NIOLockedValueBox + var context: QueueContext { self._context.withLockedValue { $0 } } + + func get(_ id: JobIdentifier) async throws -> JobData { self._context.withLockedValue { $0.application.queues.asyncTest.jobs[id]! } } + func set(_ id: JobIdentifier, to data: JobData) async throws { self._context.withLockedValue { $0.application.queues.asyncTest.jobs[id] = data } } + func clear(_ id: JobIdentifier) async throws { self._context.withLockedValue { $0.application.queues.asyncTest.jobs[id] = nil } } + func pop() async throws -> JobIdentifier? { self._context.withLockedValue { $0.application.queues.asyncTest.queue.popLast() } } + func push(_ id: JobIdentifier) async throws { self._context.withLockedValue { $0.application.queues.asyncTest.queue.append(id) } } +} diff --git a/Tests/QueuesTests/AsyncQueueTests.swift b/Tests/QueuesTests/AsyncQueueTests.swift index 7c0f42e..d77cd01 100644 --- a/Tests/QueuesTests/AsyncQueueTests.swift +++ b/Tests/QueuesTests/AsyncQueueTests.swift @@ -1,16 +1,26 @@ import Queues -import Foundation -import Vapor import XCTest import XCTVapor -import XCTQueues -@testable import Vapor -import NIOCore -import NIOConcurrencyHelpers + +func XCTAssertNoThrowAsync( + _ expression: @autoclosure () async throws -> T, + _ message: @autoclosure () -> String = "", + file: StaticString = #filePath, line: UInt = #line +) async { + do { + _ = try await expression() + } catch { + XCTAssertNoThrow(try { throw error }(), message(), file: file, line: line) + } +} final class AsyncQueueTests: XCTestCase { var app: Application! + override class func setUp() { + XCTAssert(isLoggingConfigured) + } + override func setUp() async throws { app = try await Application.make(.testing) } @@ -19,15 +29,15 @@ final class AsyncQueueTests: XCTestCase { try await app.asyncShutdown() } - func testAsyncJob() async throws { + func testAsyncJobWithSyncQueue() async throws { app.queues.use(.test) let promise = app.eventLoopGroup.any().makePromise(of: Void.self) app.queues.add(MyAsyncJob(promise: promise)) app.get("foo") { req in - req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar")) - .map { _ in "done" } + try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar")) + return "done" } try await app.testable().test(.GET, "foo") { res async in @@ -46,7 +56,37 @@ final class AsyncQueueTests: XCTestCase { XCTAssertEqual(app.queues.test.queue.count, 0) XCTAssertEqual(app.queues.test.jobs.count, 0) - try XCTAssertNoThrow(promise.futureResult.wait()) + await XCTAssertNoThrowAsync(try await promise.futureResult.get()) + } + + func testAsyncJobWithAsyncQueue() async throws { + app.queues.use(.asyncTest) + + let promise = app.eventLoopGroup.any().makePromise(of: Void.self) + app.queues.add(MyAsyncJob(promise: promise)) + + app.get("foo") { req in + try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar")) + return "done" + } + + try await app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + } + + XCTAssertEqual(app.queues.asyncTest.queue.count, 1) + XCTAssertEqual(app.queues.asyncTest.jobs.count, 1) + let job = app.queues.asyncTest.first(MyAsyncJob.self) + XCTAssert(app.queues.asyncTest.contains(MyAsyncJob.self)) + XCTAssertNotNil(job) + XCTAssertEqual(job!.foo, "bar") + + try await app.queues.queue.worker.run().get() + XCTAssertEqual(app.queues.asyncTest.queue.count, 0) + XCTAssertEqual(app.queues.asyncTest.jobs.count, 0) + + await XCTAssertNoThrowAsync(try await promise.futureResult.get()) } } @@ -58,7 +98,6 @@ struct MyAsyncJob: AsyncJob { } func dequeue(_ context: QueueContext, _ payload: Data) async throws { - promise.succeed(()) - return + self.promise.succeed() } } diff --git a/Tests/QueuesTests/QueueTests.swift b/Tests/QueuesTests/QueueTests.swift index 562287b..cd30736 100644 --- a/Tests/QueuesTests/QueueTests.swift +++ b/Tests/QueuesTests/QueueTests.swift @@ -1,353 +1,553 @@ import Atomics import Queues -import Vapor -import Foundation import XCTest import XCTVapor import XCTQueues -import NIOCore import NIOConcurrencyHelpers -@testable import Vapor + +func XCTAssertEqualAsync( + _ expression1: @autoclosure () async throws -> T, + _ expression2: @autoclosure () async throws -> T, + _ message: @autoclosure () -> String = "", + file: StaticString = #filePath, line: UInt = #line +) async where T: Equatable { + do { + let expr1 = try await expression1(), expr2 = try await expression2() + return XCTAssertEqual(expr1, expr2, message(), file: file, line: line) + } catch { + return XCTAssertEqual(try { () -> Bool in throw error }(), false, message(), file: file, line: line) + } +} + +func XCTAssertTrueAsync( + _ predicate: @autoclosure () async throws -> Bool, + _ message: @autoclosure () -> String = "", + file: StaticString = #filePath, line: UInt = #line +) async { + do { + let result = try await predicate() + XCTAssertTrue(result, message(), file: file, line: line) + } catch { + return XCTAssertTrue(try { throw error }(), message(), file: file, line: line) + } +} + +func XCTAssertFalseAsync( + _ predicate: @autoclosure () async throws -> Bool, + _ message: @autoclosure () -> String = "", + file: StaticString = #filePath, line: UInt = #line +) async { + do { + let result = try await predicate() + XCTAssertFalse(result, message(), file: file, line: line) + } catch { + return XCTAssertFalse(try { throw error }(), message(), file: file, line: line) + } +} final class QueueTests: XCTestCase { - func testVaporIntegrationWithInProcessJob() throws { - let app = Application(.testing) - app.queues.use(.test) - defer { app.shutdown() } - - let jobSignal = app.eventLoopGroup.next().makePromise(of: String.self) - app.queues.add(Foo(promise: jobSignal)) - try app.queues.startInProcessJobs(on: .default) + var app: Application! + + override class func setUp() { + XCTAssert(isLoggingConfigured) + } + + override func setUp() async throws { + self.app = try await Application.make(.testing) + self.app.queues.use(.test) + } - app.get("bar") { req in - req.queue.dispatch(Foo.self, .init(foo: "Bar payload")) - .map { _ in "job bar dispatched" } + override func tearDown() async throws { + try await self.app.asyncShutdown() + self.app = nil + } + + func testVaporIntegrationWithInProcessJob() async throws { + let jobSignal1 = self.app.eventLoopGroup.any().makePromise(of: String.self) + self.app.queues.add(Foo1(promise: jobSignal1)) + let jobSignal2 = self.app.eventLoopGroup.any().makePromise(of: String.self) + self.app.queues.add(Foo2(promise: jobSignal2)) + try self.app.queues.startInProcessJobs(on: .default) + + self.app.get("bar1") { req in + try await req.queue.dispatch(Foo1.self, .init(foo: "Bar payload")).get() + return "job bar dispatched" + } + + self.app.get("bar2") { req in + try await req.queue.dispatch(Foo2.self, .init(foo: "Bar payload")) + return "job bar dispatched" } - try app.testable().test(.GET, "bar") { res in + try await self.app.testable().test(.GET, "bar1") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "job bar dispatched") + }.test(.GET, "bar2") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "job bar dispatched") } - try XCTAssertEqual(jobSignal.futureResult.wait(), "Bar payload") + await XCTAssertEqualAsync(try await jobSignal1.futureResult.get(), "Bar payload") + await XCTAssertEqualAsync(try await jobSignal2.futureResult.get(), "Bar payload") } - func testVaporIntegration() throws { - let app = Application(.testing) - defer { app.shutdown() } - app.queues.use(.test) - - let promise = app.eventLoopGroup.next().makePromise(of: String.self) - app.queues.add(Foo(promise: promise)) - - app.get("foo") { req in - req.queue.dispatch(Foo.self, .init(foo: "bar")) - .map { _ in "done" } + func testVaporIntegration() async throws { + let promise = self.app.eventLoopGroup.any().makePromise(of: String.self) + self.app.queues.add(Foo1(promise: promise)) + + self.app.get("foo") { req in + try await req.queue.dispatch(Foo1.self, .init(foo: "bar")) + return "done" } - try app.testable().test(.GET, "foo") { res in + try await self.app.testable().test(.GET, "foo") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "done") } - XCTAssertEqual(app.queues.test.queue.count, 1) - XCTAssertEqual(app.queues.test.jobs.count, 1) - let job = app.queues.test.first(Foo.self) - XCTAssert(app.queues.test.contains(Foo.self)) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + let job = self.app.queues.test.first(Foo1.self) + XCTAssert(self.app.queues.test.contains(Foo1.self)) XCTAssertNotNil(job) XCTAssertEqual(job!.foo, "bar") - try app.queues.queue.worker.run().wait() - XCTAssertEqual(app.queues.test.queue.count, 0) - XCTAssertEqual(app.queues.test.jobs.count, 0) + try await self.app.queues.queue.worker.run() + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) - try XCTAssertEqual(promise.futureResult.wait(), "bar") + await XCTAssertEqualAsync(try await promise.futureResult.get(), "bar") } - func testSettingCustomId() throws { - let app = Application(.testing) - defer { app.shutdown() } - app.queues.use(.test) - - let promise = app.eventLoopGroup.next().makePromise(of: String.self) - app.queues.add(Foo(promise: promise)) - - app.get("foo") { req in - req.queue.dispatch(Foo.self, .init(foo: "bar"), id: JobIdentifier(string: "my-custom-id")) - .map { _ in "done" } + func testSettingCustomId() async throws { + let promise = self.app.eventLoopGroup.any().makePromise(of: String.self) + self.app.queues.add(Foo1(promise: promise)) + + self.app.get("foo") { req in + try await req.queue.dispatch(Foo1.self, .init(foo: "bar"), id: JobIdentifier(string: "my-custom-id")) + return "done" } - try app.testable().test(.GET, "foo") { res in + try await self.app.testable().test(.GET, "foo") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "done") } - XCTAssertEqual(app.queues.test.queue.count, 1) - XCTAssertEqual(app.queues.test.jobs.count, 1) - XCTAssertTrue(app.queues.test.jobs.keys.map(\.string).contains("my-custom-id")) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + XCTAssert(self.app.queues.test.jobs.keys.map(\.string).contains("my-custom-id")) - try app.queues.queue.worker.run().wait() - XCTAssertEqual(app.queues.test.queue.count, 0) - XCTAssertEqual(app.queues.test.jobs.count, 0) + try await self.app.queues.queue.worker.run() + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) - try XCTAssertEqual(promise.futureResult.wait(), "bar") + await XCTAssertEqualAsync(try await promise.futureResult.get(), "bar") } - func testScheduleBuilderAPI() throws { - let app = Application(.testing) - defer { app.shutdown() } - + func testScheduleBuilderAPI() async throws { // yearly - app.queues.schedule(Cleanup()) - .yearly() - .in(.may) - .on(23) - .at(.noon) + self.app.queues.schedule(Cleanup()).yearly().in(.may).on(23).at(.noon) // monthly - app.queues.schedule(Cleanup()) - .monthly() - .on(15) - .at(.midnight) + self.app.queues.schedule(Cleanup()).monthly().on(15).at(.midnight) // weekly - app.queues.schedule(Cleanup()) - .weekly() - .on(.monday) - .at("3:13am") + self.app.queues.schedule(Cleanup()).weekly().on(.monday).at("3:13am") // daily - app.queues.schedule(Cleanup()) - .daily() - .at("5:23pm") + self.app.queues.schedule(Cleanup()).daily().at("5:23pm") // daily 2 - app.queues.schedule(Cleanup()) - .daily() - .at(5, 23, .pm) + self.app.queues.schedule(Cleanup()).daily().at(5, 23, .pm) // daily 3 - app.queues.schedule(Cleanup()) - .daily() - .at(17, 23) + self.app.queues.schedule(Cleanup()).daily().at(17, 23) // hourly - app.queues.schedule(Cleanup()) - .hourly() - .at(30) + self.app.queues.schedule(Cleanup()).hourly().at(30) } - func testRepeatingScheduledJob() throws { - let app = Application(.testing) - defer { app.shutdown() } + func testRepeatingScheduledJob() async throws { + let scheduledJob = TestingScheduledJob() + XCTAssertEqual(scheduledJob.count.load(ordering: .relaxed), 0) + self.app.queues.schedule(scheduledJob).everySecond() + try self.app.queues.startScheduledJobs() - XCTAssertEqual(TestingScheduledJob.count.load(ordering: .relaxed), 0) - app.queues.schedule(TestingScheduledJob()).everySecond() - try app.queues.startScheduledJobs() + let promise = self.app.eventLoopGroup.any().makePromise(of: Void.self) + self.app.eventLoopGroup.any().scheduleTask(in: .seconds(5)) { + XCTAssert(scheduledJob.count.load(ordering: .relaxed) > 4) + promise.succeed() + } + + try await promise.futureResult.get() + } + + func testAsyncRepeatingScheduledJob() async throws { + let scheduledJob = AsyncTestingScheduledJob() + XCTAssertEqual(scheduledJob.count.load(ordering: .relaxed), 0) + self.app.queues.schedule(scheduledJob).everySecond() + try self.app.queues.startScheduledJobs() - let promise = app.eventLoopGroup.next().makePromise(of: Void.self) - app.eventLoopGroup.next().scheduleTask(in: .seconds(5)) { () -> Void in - XCTAssert(TestingScheduledJob.count.load(ordering: .relaxed) > 4) - promise.succeed(()) + let promise = self.app.eventLoopGroup.any().makePromise(of: Void.self) + self.app.eventLoopGroup.any().scheduleTask(in: .seconds(5)) { + XCTAssert(scheduledJob.count.load(ordering: .relaxed) > 4) + promise.succeed() } - try promise.futureResult.wait() + try await promise.futureResult.get() } - func testFailingScheduledJob() throws { - let app = Application(.testing) - defer { app.shutdown() } + func testFailingScheduledJob() async throws { + self.app.queues.schedule(FailingScheduledJob()).everySecond() + try self.app.queues.startScheduledJobs() - app.queues.schedule(FailingScheduledJob()).everySecond() - try app.queues.startScheduledJobs() + let promise = self.app.eventLoopGroup.any().makePromise(of: Void.self) + self.app.eventLoopGroup.any().scheduleTask(in: .seconds(1)) { + promise.succeed() + } + try await promise.futureResult.get() + } + + func testAsyncFailingScheduledJob() async throws { + self.app.queues.schedule(AsyncFailingScheduledJob()).everySecond() + try self.app.queues.startScheduledJobs() - let promise = app.eventLoopGroup.next().makePromise(of: Void.self) - app.eventLoopGroup.next().scheduleTask(in: .seconds(1)) { () -> Void in - promise.succeed(()) + let promise = self.app.eventLoopGroup.any().makePromise(of: Void.self) + self.app.eventLoopGroup.any().scheduleTask(in: .seconds(1)) { + promise.succeed() } - try promise.futureResult.wait() + try await promise.futureResult.get() } - func testCustomWorkerCount() throws { + func testCustomWorkerCount() async throws { // Setup custom ELG with 4 threads let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 4) - defer { try! eventLoopGroup.syncShutdownGracefully() } + + do { + let count = self.app.eventLoopGroup.any().makePromise(of: Int.self) + self.app.queues.use(custom: WorkerCountDriver(count: count)) + // Limit worker count to less than 4 threads + self.app.queues.configuration.workerCount = 2 + + try self.app.queues.startInProcessJobs(on: .default) + await XCTAssertEqualAsync(try await count.futureResult.get(), 2) + } catch { + try? await eventLoopGroup.shutdownGracefully() + throw error + } + try await eventLoopGroup.shutdownGracefully() + } - let app = Application(.testing, .shared(eventLoopGroup)) - defer { app.shutdown() } + func testSuccessHooks() async throws { + let promise = self.app.eventLoopGroup.any().makePromise(of: String.self) + let successHook = SuccessHook() + let errorHook = ErrorHook() + let dispatchHook = DispatchHook() + let dequeuedHook = DequeuedHook() + self.app.queues.add(Foo1(promise: promise)) + self.app.queues.add(successHook) + self.app.queues.add(errorHook) + self.app.queues.add(dispatchHook) + self.app.queues.add(dequeuedHook) + + self.app.get("foo") { req in + try await req.queue.dispatch(Foo1.self, .init(foo: "bar")) + return "done" + } - let count = app.eventLoopGroup.next().makePromise(of: Int.self) - app.queues.use(custom: WorkerCountDriver(count: count)) - // Limit worker count to less than 4 threads - app.queues.configuration.workerCount = 2 + XCTAssertFalse(dispatchHook.successHit) + try await self.app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + XCTAssertTrue(dispatchHook.successHit) + } - try app.queues.startInProcessJobs(on: .default) - try XCTAssertEqual(count.futureResult.wait(), 2) + XCTAssertFalse(successHook.successHit) + XCTAssertEqual(errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + let job = self.app.queues.test.first(Foo1.self) + XCTAssert(self.app.queues.test.contains(Foo1.self)) + XCTAssertNotNil(job) + XCTAssertEqual(job!.foo, "bar") + XCTAssertFalse(dequeuedHook.successHit) + + try await self.app.queues.queue.worker.run() + XCTAssertTrue(successHook.successHit) + XCTAssertEqual(errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) + XCTAssertTrue(dequeuedHook.successHit) + + await XCTAssertEqualAsync(try await promise.futureResult.get(), "bar") } - func testSuccessHooks() throws { - let app = Application(.testing) - defer { app.shutdown() } - app.queues.use(.test) + func testAsyncSuccessHooks() async throws { + let promise = self.app.eventLoopGroup.any().makePromise(of: String.self) + let successHook = AsyncSuccessHook() + let errorHook = AsyncErrorHook() + let dispatchHook = AsyncDispatchHook() + let dequeuedHook = AsyncDequeuedHook() + self.app.queues.add(Foo1(promise: promise)) + self.app.queues.add(successHook) + self.app.queues.add(errorHook) + self.app.queues.add(dispatchHook) + self.app.queues.add(dequeuedHook) + + self.app.get("foo") { req in + try await req.queue.dispatch(Foo1.self, .init(foo: "bar")) + return "done" + } + + await XCTAssertFalseAsync(await dispatchHook.successHit) + try await self.app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + await XCTAssertTrueAsync(await dispatchHook.successHit) + } + + await XCTAssertFalseAsync(await successHook.successHit) + await XCTAssertEqualAsync(await errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + let job = self.app.queues.test.first(Foo1.self) + XCTAssert(self.app.queues.test.contains(Foo1.self)) + XCTAssertNotNil(job) + XCTAssertEqual(job!.foo, "bar") + await XCTAssertFalseAsync(await dequeuedHook.successHit) - let promise = app.eventLoopGroup.next().makePromise(of: String.self) - app.queues.add(Foo(promise: promise)) - app.queues.add(SuccessHook()) - app.queues.add(ErrorHook()) - app.queues.add(DispatchHook()) - app.queues.add(DequeuedHook()) - ErrorHook.errorCount = 0 - DequeuedHook.successHit = false + try await self.app.queues.queue.worker.run() + await XCTAssertTrueAsync(await successHook.successHit) + await XCTAssertEqualAsync(await errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) + await XCTAssertTrueAsync(await dequeuedHook.successHit) - app.get("foo") { req in - req.queue.dispatch(Foo.self, .init(foo: "bar")) - .map { _ in "done" } + await XCTAssertEqualAsync(try await promise.futureResult.get(), "bar") + } + + func testFailureHooks() async throws { + self.app.queues.use(.test) + self.app.queues.add(Bar()) + let successHook = SuccessHook() + let errorHook = ErrorHook() + self.app.queues.add(successHook) + self.app.queues.add(errorHook) + + self.app.get("foo") { req in + try await req.queue.dispatch(Bar.self, .init(foo: "bar"), maxRetryCount: 3) + return "done" } - XCTAssertEqual(DispatchHook.successHit, false) - try app.testable().test(.GET, "foo") { res in + try await self.app.testable().test(.GET, "foo") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "done") - XCTAssertEqual(DispatchHook.successHit, true) } - XCTAssertEqual(SuccessHook.successHit, false) - XCTAssertEqual(ErrorHook.errorCount, 0) - XCTAssertEqual(app.queues.test.queue.count, 1) - XCTAssertEqual(app.queues.test.jobs.count, 1) - let job = app.queues.test.first(Foo.self) - XCTAssert(app.queues.test.contains(Foo.self)) + XCTAssertFalse(successHook.successHit) + XCTAssertEqual(errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + let job = self.app.queues.test.first(Bar.self) + XCTAssert(self.app.queues.test.contains(Bar.self)) XCTAssertNotNil(job) - XCTAssertEqual(job!.foo, "bar") - XCTAssertEqual(DequeuedHook.successHit, false) - - try app.queues.queue.worker.run().wait() - XCTAssertEqual(SuccessHook.successHit, true) - XCTAssertEqual(ErrorHook.errorCount, 0) - XCTAssertEqual(app.queues.test.queue.count, 0) - XCTAssertEqual(app.queues.test.jobs.count, 0) - XCTAssertEqual(DequeuedHook.successHit, true) + + try await self.app.queues.queue.worker.run() + try await self.app.queues.queue.worker.run() + try await self.app.queues.queue.worker.run() + try await self.app.queues.queue.worker.run() + XCTAssertFalse(successHook.successHit) + XCTAssertEqual(errorHook.errorCount, 1) + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) + } + + func testAsyncFailureHooks() async throws { + self.app.queues.use(.test) + self.app.queues.add(Bar()) + let successHook = AsyncSuccessHook() + let errorHook = AsyncErrorHook() + self.app.queues.add(successHook) + self.app.queues.add(errorHook) - try XCTAssertEqual(promise.futureResult.wait(), "bar") + self.app.get("foo") { req in + try await req.queue.dispatch(Bar.self, .init(foo: "bar"), maxRetryCount: 3) + return "done" + } + + try await self.app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + } + + await XCTAssertFalseAsync(await successHook.successHit) + await XCTAssertEqualAsync(await errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + let job = self.app.queues.test.first(Bar.self) + XCTAssert(self.app.queues.test.contains(Bar.self)) + XCTAssertNotNil(job) + + try await self.app.queues.queue.worker.run() + try await self.app.queues.queue.worker.run() + try await self.app.queues.queue.worker.run() + try await self.app.queues.queue.worker.run() + await XCTAssertFalseAsync(await successHook.successHit) + await XCTAssertEqualAsync(await errorHook.errorCount, 1) + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) } - func testFailureHooks() throws { - let app = Application(.testing) - defer { app.shutdown() } - app.queues.use(.test) - app.queues.add(Bar()) - app.queues.add(SuccessHook()) - app.queues.add(ErrorHook()) - ErrorHook.errorCount = 0 + func testFailureHooksWithDelay() async throws { + self.app.queues.add(Baz()) + let successHook = SuccessHook() + let errorHook = ErrorHook() + self.app.queues.add(successHook) + self.app.queues.add(errorHook) - app.get("foo") { req in - req.queue.dispatch(Bar.self, .init(foo: "bar"), maxRetryCount: 3) - .map { _ in "done" } + self.app.get("foo") { req in + try await req.queue.dispatch(Baz.self, .init(foo: "baz"), maxRetryCount: 1) + return "done" } - try app.testable().test(.GET, "foo") { res in + try await self.app.testable().test(.GET, "foo") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "done") } - XCTAssertEqual(SuccessHook.successHit, false) - XCTAssertEqual(ErrorHook.errorCount, 0) - XCTAssertEqual(app.queues.test.queue.count, 1) - XCTAssertEqual(app.queues.test.jobs.count, 1) - let job = app.queues.test.first(Bar.self) - XCTAssert(app.queues.test.contains(Bar.self)) + XCTAssertFalse(successHook.successHit) + XCTAssertEqual(errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + var job = self.app.queues.test.first(Baz.self) + XCTAssert(self.app.queues.test.contains(Baz.self)) + XCTAssertNotNil(job) + + try await self.app.queues.queue.worker.run() + XCTAssertFalse(successHook.successHit) + XCTAssertEqual(errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + job = self.app.queues.test.first(Baz.self) + XCTAssert(self.app.queues.test.contains(Baz.self)) XCTAssertNotNil(job) - try app.queues.queue.worker.run().wait() - XCTAssertEqual(SuccessHook.successHit, false) - XCTAssertEqual(ErrorHook.errorCount, 1) - XCTAssertEqual(app.queues.test.queue.count, 0) - XCTAssertEqual(app.queues.test.jobs.count, 0) - } - - func testFailureHooksWithDelay() throws { - let app = Application(.testing) - defer { app.shutdown() } - app.queues.use(.test) - app.queues.add(Baz()) - app.queues.add(SuccessHook()) - app.queues.add(ErrorHook()) - ErrorHook.errorCount = 0 - - app.get("foo") { req in - req.queue.dispatch(Baz.self, .init(foo: "baz"), maxRetryCount: 1) - .map { _ in "done" } + sleep(1) + + try await self.app.queues.queue.worker.run() + XCTAssertFalse(successHook.successHit) + XCTAssertEqual(errorHook.errorCount, 1) + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) + } + + func testAsyncFailureHooksWithDelay() async throws { + self.app.queues.add(Baz()) + let successHook = AsyncSuccessHook() + let errorHook = AsyncErrorHook() + self.app.queues.add(successHook) + self.app.queues.add(errorHook) + + self.app.get("foo") { req in + try await req.queue.dispatch(Baz.self, .init(foo: "baz"), maxRetryCount: 1) + return "done" } - try app.testable().test(.GET, "foo") { res in + try await self.app.testable().test(.GET, "foo") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "done") } - XCTAssertEqual(SuccessHook.successHit, false) - XCTAssertEqual(ErrorHook.errorCount, 0) - XCTAssertEqual(app.queues.test.queue.count, 1) - XCTAssertEqual(app.queues.test.jobs.count, 1) - var job = app.queues.test.first(Baz.self) - XCTAssert(app.queues.test.contains(Baz.self)) + await XCTAssertFalseAsync(await successHook.successHit) + await XCTAssertEqualAsync(await errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + var job = self.app.queues.test.first(Baz.self) + XCTAssert(self.app.queues.test.contains(Baz.self)) XCTAssertNotNil(job) - try app.queues.queue.worker.run().wait() - XCTAssertEqual(SuccessHook.successHit, false) - XCTAssertEqual(ErrorHook.errorCount, 0) - XCTAssertEqual(app.queues.test.queue.count, 1) - XCTAssertEqual(app.queues.test.jobs.count, 1) - job = app.queues.test.first(Baz.self) - XCTAssert(app.queues.test.contains(Baz.self)) + try await self.app.queues.queue.worker.run() + await XCTAssertFalseAsync(await successHook.successHit) + await XCTAssertEqualAsync(await errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + job = self.app.queues.test.first(Baz.self) + XCTAssert(self.app.queues.test.contains(Baz.self)) XCTAssertNotNil(job) sleep(1) - try app.queues.queue.worker.run().wait() - XCTAssertEqual(SuccessHook.successHit, false) - XCTAssertEqual(ErrorHook.errorCount, 1) - XCTAssertEqual(app.queues.test.queue.count, 0) - XCTAssertEqual(app.queues.test.jobs.count, 0) + try await self.app.queues.queue.worker.run() + await XCTAssertFalseAsync(await successHook.successHit) + await XCTAssertEqualAsync(await errorHook.errorCount, 1) + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) + } + + func testStuffThatIsntActuallyUsedAnywhere() { + XCTAssertEqual(self.app.queues.queue(.default).key, "vapor_queues[default]") + XCTAssertNotNil(QueuesEventLoopPreference.indifferent.delegate(for: self.app.eventLoopGroup)) + XCTAssertNotNil(QueuesEventLoopPreference.delegate(on: self.app.eventLoopGroup.any()).delegate(for: self.app.eventLoopGroup)) } } -class DispatchHook: JobEventDelegate { - static var successHit = false +final class DispatchHook: JobEventDelegate, @unchecked Sendable { + var successHit = false - func dispatched(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture { - Self.successHit = true - return eventLoop.future() + func dispatched(job: JobEventData, eventLoop: any EventLoop) -> EventLoopFuture { + self.successHit = true + return eventLoop.makeSucceededVoidFuture() } } -class SuccessHook: JobEventDelegate { - static var successHit = false +final class SuccessHook: JobEventDelegate, @unchecked Sendable { + var successHit = false - func success(jobId: String, eventLoop: EventLoop) -> EventLoopFuture { - Self.successHit = true - return eventLoop.future() + func success(jobId: String, eventLoop: any EventLoop) -> EventLoopFuture { + self.successHit = true + return eventLoop.makeSucceededVoidFuture() } } -class ErrorHook: JobEventDelegate { - static var errorCount = 0 +final class ErrorHook: JobEventDelegate, @unchecked Sendable { + var errorCount = 0 - func error(jobId: String, error: Error, eventLoop: EventLoop) -> EventLoopFuture { - Self.errorCount += 1 - return eventLoop.future() + func error(jobId: String, error: any Error, eventLoop: any EventLoop) -> EventLoopFuture { + self.errorCount += 1 + return eventLoop.makeSucceededVoidFuture() } } -class DequeuedHook: JobEventDelegate { - static var successHit = false +final class DequeuedHook: JobEventDelegate, @unchecked Sendable { + var successHit = false - func didDequeue(jobId: String, eventLoop: EventLoop) -> EventLoopFuture { - Self.successHit = true - return eventLoop.future() + func didDequeue(jobId: String, eventLoop: any EventLoop) -> EventLoopFuture { + self.successHit = true + return eventLoop.makeSucceededVoidFuture() } } -final class WorkerCountDriver: QueuesDriver { +actor AsyncDispatchHook: AsyncJobEventDelegate { + var successHit = false + func dispatched(job: JobEventData) async throws { self.successHit = true } +} + +actor AsyncSuccessHook: AsyncJobEventDelegate { + var successHit = false + func success(jobId: String) async throws { self.successHit = true } +} + +actor AsyncErrorHook: AsyncJobEventDelegate { + var errorCount = 0 + func error(jobId: String, error: any Error) async throws { self.errorCount += 1 } +} + +actor AsyncDequeuedHook: AsyncJobEventDelegate { + var successHit = false + func didDequeue(jobId: String) async throws { self.successHit = true } +} + +final class WorkerCountDriver: QueuesDriver, @unchecked Sendable { let count: EventLoopPromise let lock: NIOLock var recordedEventLoops: Set @@ -358,11 +558,11 @@ final class WorkerCountDriver: QueuesDriver { self.recordedEventLoops = [] } - func makeQueue(with context: QueueContext) -> Queue { + func makeQueue(with context: QueueContext) -> any Queue { WorkerCountQueue(driver: self, context: context) } - func record(eventLoop: EventLoop) { + func record(eventLoop: any EventLoop) { self.lock.lock() defer { self.lock.unlock() } let previousCount = self.recordedEventLoops.count @@ -381,53 +581,60 @@ final class WorkerCountDriver: QueuesDriver { let driver: WorkerCountDriver var context: QueueContext - func get(_ id: JobIdentifier) -> EventLoopFuture { - fatalError() - } - - func set(_ id: JobIdentifier, to data: JobData) -> EventLoopFuture { - fatalError() - } - - func clear(_ id: JobIdentifier) -> EventLoopFuture { - fatalError() - } - + func get(_ id: JobIdentifier) -> EventLoopFuture { fatalError() } + func set(_ id: JobIdentifier, to data: JobData) -> EventLoopFuture { fatalError() } + func clear(_ id: JobIdentifier) -> EventLoopFuture { fatalError() } func pop() -> EventLoopFuture { self.driver.record(eventLoop: self.context.eventLoop) return self.context.eventLoop.makeSucceededFuture(nil) } - - func push(_ id: JobIdentifier) -> EventLoopFuture { - fatalError() - } + func push(_ id: JobIdentifier) -> EventLoopFuture { fatalError() } } } -struct Failure: Error { } +struct Failure: Error {} + struct FailingScheduledJob: ScheduledJob { - func run(context: QueueContext) -> EventLoopFuture { - context.eventLoop.makeFailedFuture(Failure()) - } + func run(context: QueueContext) -> EventLoopFuture { context.eventLoop.makeFailedFuture(Failure()) } +} + +struct AsyncFailingScheduledJob: AsyncScheduledJob { + func run(context: QueueContext) async throws { throw Failure() } } struct TestingScheduledJob: ScheduledJob { - static var count = ManagedAtomic(0) + var count = ManagedAtomic(0) func run(context: QueueContext) -> EventLoopFuture { - TestingScheduledJob.count.wrappingIncrement(ordering: .relaxed) - return context.eventLoop.future() + self.count.wrappingIncrement(ordering: .relaxed) + return context.eventLoop.makeSucceededVoidFuture() } } -extension ByteBuffer { - var string: String { - return .init(decoding: self.readableBytesView, as: UTF8.self) - } +struct AsyncTestingScheduledJob: AsyncScheduledJob { + var count = ManagedAtomic(0) + func run(context: QueueContext) async throws { self.count.wrappingIncrement(ordering: .relaxed) } } +struct Foo1: Job { + let promise: EventLoopPromise + + struct Data: Codable { + var foo: String + } + + func dequeue(_ context: QueueContext, _ data: Data) -> EventLoopFuture { + self.promise.succeed(data.foo) + return context.eventLoop.makeSucceededVoidFuture() + } + + func error(_ context: QueueContext, _ error: any Error, _ data: Data) -> EventLoopFuture { + self.promise.fail(error) + return context.eventLoop.makeSucceededVoidFuture() + } +} -struct Foo: Job { +struct Foo2: Job { let promise: EventLoopPromise struct Data: Codable { @@ -436,12 +643,12 @@ struct Foo: Job { func dequeue(_ context: QueueContext, _ data: Data) -> EventLoopFuture { self.promise.succeed(data.foo) - return context.eventLoop.makeSucceededFuture(()) + return context.eventLoop.makeSucceededVoidFuture() } - func error(_ context: QueueContext, _ error: Error, _ data: Data) -> EventLoopFuture { + func error(_ context: QueueContext, _ error: any Error, _ data: Data) -> EventLoopFuture { self.promise.fail(error) - return context.eventLoop.makeSucceededFuture(()) + return context.eventLoop.makeSucceededVoidFuture() } } @@ -451,11 +658,11 @@ struct Bar: Job { } func dequeue(_ context: QueueContext, _ data: Data) -> EventLoopFuture { - return context.eventLoop.makeFailedFuture(Abort(.badRequest)) + context.eventLoop.makeFailedFuture(Abort(.badRequest)) } - func error(_ context: QueueContext, _ error: Error, _ data: Data) -> EventLoopFuture { - return context.eventLoop.makeSucceededFuture(()) + func error(_ context: QueueContext, _ error: any Error, _ data: Data) -> EventLoopFuture { + context.eventLoop.makeSucceededVoidFuture() } } @@ -465,15 +672,14 @@ struct Baz: Job { } func dequeue(_ context: QueueContext, _ data: Data) -> EventLoopFuture { - return context.eventLoop.makeFailedFuture(Abort(.badRequest)) + context.eventLoop.makeFailedFuture(Abort(.badRequest)) } - func error(_ context: QueueContext, _ error: Error, _ data: Data) -> EventLoopFuture { - return context.eventLoop.makeSucceededFuture(()) + func error(_ context: QueueContext, _ error: any Error, _ data: Data) -> EventLoopFuture { + context.eventLoop.makeSucceededVoidFuture() } func nextRetryIn(attempt: Int) -> Int { - return attempt + attempt } } - diff --git a/Tests/QueuesTests/ScheduleBuilderTests.swift b/Tests/QueuesTests/ScheduleBuilderTests.swift index acd0f2f..4eb4dd9 100644 --- a/Tests/QueuesTests/ScheduleBuilderTests.swift +++ b/Tests/QueuesTests/ScheduleBuilderTests.swift @@ -1,9 +1,12 @@ import Foundation import Queues import XCTest -import NIOCore final class ScheduleBuilderTests: XCTestCase { + override class func setUp() { + XCTAssert(isLoggingConfigured) + } + func testHourlyBuilder() throws { let builder = ScheduleBuilder() builder.hourly().at(30) @@ -137,7 +140,6 @@ final class ScheduleBuilderTests: XCTestCase { } func testCustomCalendarBuilder() throws { - let est = Calendar.calendar(timezone: "EST") let mst = Calendar.calendar(timezone: "MST") @@ -153,14 +155,13 @@ final class ScheduleBuilderTests: XCTestCase { // one hour later Date(calendar: est, hour: 21, minute: 00) ) - } } final class Cleanup: ScheduledJob { func run(context: QueueContext) -> EventLoopFuture { - return context.eventLoop.makeSucceededFuture(()) + context.eventLoop.makeSucceededVoidFuture() } } diff --git a/Tests/QueuesTests/Utilities.swift b/Tests/QueuesTests/Utilities.swift new file mode 100644 index 0000000..90cb197 --- /dev/null +++ b/Tests/QueuesTests/Utilities.swift @@ -0,0 +1,15 @@ +import Logging +import class Foundation.ProcessInfo + +func env(_ name: String) -> String? { + return ProcessInfo.processInfo.environment[name] +} + +let isLoggingConfigured: Bool = { + LoggingSystem.bootstrap { label in + var handler = StreamLogHandler.standardOutput(label: label) + handler.logLevel = env("LOG_LEVEL").flatMap { .init(rawValue: $0) } ?? .info + return handler + } + return true +}()