diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 48b8aa7..04c492a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -5,7 +5,7 @@ on: jobs: unit-tests: - uses: vapor/ci/.github/workflows/run-unit-tests.yml@reusable-workflows + uses: vapor/ci/.github/workflows/run-unit-tests.yml@main with: - with_coverage: true with_tsan: false + secrets: inherit diff --git a/Package.swift b/Package.swift index aec0cdf..e3d8f64 100644 --- a/Package.swift +++ b/Package.swift @@ -6,8 +6,8 @@ let package = Package( platforms: [ .macOS(.v10_15), .iOS(.v13), - .tvOS(.v13), .watchOS(.v6), + .tvOS(.v13), ], products: [ .library(name: "Queues", targets: ["Queues"]), @@ -15,20 +15,37 @@ let package = Package( ], dependencies: [ .package(url: "https://github.com/vapor/vapor.git", from: "4.101.1"), - .package(url: "https://github.com/apple/swift-nio.git", from: "2.53.0"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.65.0"), ], targets: [ - .target(name: "Queues", dependencies: [ - .product(name: "Vapor", package: "vapor"), - .product(name: "NIOCore", package: "swift-nio"), - ]), - .target(name: "XCTQueues", dependencies: [ - .target(name: "Queues") - ]), - .testTarget(name: "QueuesTests", dependencies: [ - .target(name: "Queues"), - .product(name: "XCTVapor", package: "vapor"), - .target(name: "XCTQueues") - ]), + .target( + name: "Queues", + dependencies: [ + .product(name: "Vapor", package: "vapor"), + .product(name: "NIOCore", package: "swift-nio"), + ], + swiftSettings: swiftSettings + ), + .target( + name: "XCTQueues", + dependencies: [ + .target(name: "Queues"), + ], + swiftSettings: swiftSettings + ), + .testTarget( + name: "QueuesTests", + dependencies: [ + .target(name: "Queues"), + .target(name: "XCTQueues"), + .product(name: "XCTVapor", package: "vapor"), + ], + swiftSettings: swiftSettings + ), ] ) + +var swiftSettings: [SwiftSetting] { [ + .enableUpcomingFeature("ForwardTrailingClosures"), + .enableUpcomingFeature("ConciseMagicFile"), +] } diff --git a/Package@swift-5.9.swift b/Package@swift-5.9.swift new file mode 100644 index 0000000..1092a83 --- /dev/null +++ b/Package@swift-5.9.swift @@ -0,0 +1,54 @@ +// swift-tools-version:5.9 +import PackageDescription + +let package = Package( + name: "queues", + platforms: [ + .macOS(.v10_15), + .iOS(.v13), + .watchOS(.v6), + .tvOS(.v13), + ], + products: [ + .library(name: "Queues", targets: ["Queues"]), + .library(name: "XCTQueues", targets: ["XCTQueues"]) + ], + dependencies: [ + .package(url: "https://github.com/vapor/vapor.git", from: "4.101.1"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.65.0"), + ], + targets: [ + .target( + name: "Queues", + dependencies: [ + .product(name: "Vapor", package: "vapor"), + .product(name: "NIOCore", package: "swift-nio"), + ], + swiftSettings: swiftSettings + ), + .target( + name: "XCTQueues", + dependencies: [ + .target(name: "Queues"), + ], + swiftSettings: swiftSettings + ), + .testTarget( + name: "QueuesTests", + dependencies: [ + .target(name: "Queues"), + .target(name: "XCTQueues"), + .product(name: "XCTVapor", package: "vapor"), + ], + swiftSettings: swiftSettings + ), + ] +) + +var swiftSettings: [SwiftSetting] { [ + .enableUpcomingFeature("ForwardTrailingClosures"), + .enableUpcomingFeature("ExistentialAny"), + .enableUpcomingFeature("ConciseMagicFile"), + .enableUpcomingFeature("DisableOutwardActorInference"), + .enableExperimentalFeature("StrictConcurrency=complete"), +] } diff --git a/README.md b/README.md index fac4104..7c713f7 100644 --- a/README.md +++ b/README.md @@ -1,29 +1,18 @@

- Queues -
-
- - Docs - - - Team Chat - - - MIT License - - - Continuous Integration - - - Swift 5.6 - - - Swift 5.8 - + + + + Queues + +
+
+Documentation +Team Chat +MIT License +Continuous Integration + +Swift 5.8+

+
diff --git a/Sources/Queues/Application+Queues.swift b/Sources/Queues/Application+Queues.swift index dc7d914..021d069 100644 --- a/Sources/Queues/Application+Queues.swift +++ b/Sources/Queues/Application+Queues.swift @@ -2,39 +2,56 @@ import Foundation import Logging import Vapor import NIO +import NIOConcurrencyHelpers extension Application { - /// The `Queues` object + /// The application-global ``Queues`` accessor. public var queues: Queues { .init(application: self) } - /// Represents a `Queues` configuration object + /// Contains global configuration for queues and provides methods for registering jobs and retrieving queues. public struct Queues { - - /// The provider of the `Queues` configuration + /// A provider for a ``Queues`` driver. public struct Provider { - let run: (Application) -> () + let run: @Sendable (Application) -> () - public init(_ run: @escaping (Application) -> ()) { + public init(_ run: @escaping @Sendable (Application) -> ()) { self.run = run } } - final class Storage { - public var configuration: QueuesConfiguration - private (set) var commands: [QueuesCommand] - var driver: QueuesDriver? + final class Storage: Sendable { + private struct Box: Sendable { + var configuration: QueuesConfiguration + var commands: [QueuesCommand] + var driver: (any QueuesDriver)? + } + private let box: NIOLockedValueBox + + public var configuration: QueuesConfiguration { + get { self.box.withLockedValue { $0.configuration } } + set { self.box.withLockedValue { $0.configuration = newValue } } + } + var commands: [QueuesCommand] { self.box.withLockedValue { $0.commands } } + var driver: (any QueuesDriver)? { + get { self.box.withLockedValue { $0.driver } } + set { self.box.withLockedValue { $0.driver = newValue } } + } public init(_ application: Application) { - self.configuration = .init(logger: application.logger) - let command: QueuesCommand = .init(application: application) - self.commands = [command] - application.commands.use(command, as: "queues") + let command = QueuesCommand(application: application) + + self.box = .init(.init( + configuration: .init(logger: application.logger), + commands: [command], + driver: nil + )) + application.asyncCommands.use(command, as: "queues") } public func add(command: QueuesCommand) { - self.commands.append(command) + self.box.withLockedValue { $0.commands.append(command) } } } @@ -44,35 +61,26 @@ extension Application { struct Lifecycle: LifecycleHandler { func shutdown(_ application: Application) { - application.queues.storage.commands.forEach({$0.shutdown()}) - if let driver = application.queues.storage.driver { - driver.shutdown() - } + application.queues.storage.commands.forEach { $0.shutdown() } + application.queues.storage.driver?.shutdown() } func shutdownAsync(_ application: Application) async { for command in application.queues.storage.commands { await command.asyncShutdown() } - if let driver = application.queues.storage.driver { - await driver.asyncShutdown() - } + await application.queues.storage.driver?.asyncShutdown() } } - /// The `QueuesConfiguration` object + /// The ``QueuesConfiguration`` object. public var configuration: QueuesConfiguration { get { self.storage.configuration } nonmutating set { self.storage.configuration = newValue } } - /// Returns the default `Queue` - public var queue: Queue { - self.queue(.default) - } - - /// The selected `QueuesDriver` - public var driver: QueuesDriver { + /// The selected ``QueuesDriver``. + public var driver: any QueuesDriver { guard let driver = self.storage.driver else { fatalError("No Queues driver configured. Configure with app.queues.use(...)") } @@ -88,7 +96,13 @@ extension Application { public let application: Application - /// Returns a `JobsQueue` + /// Get the default ``Queue``. + public var queue: any Queue { + self.queue(.default) + } + + /// Create or look up an instance of a named ``Queue``. + /// /// - Parameters: /// - name: The name of the queue /// - logger: A logger object @@ -96,71 +110,75 @@ extension Application { public func queue( _ name: QueueName, logger: Logger? = nil, - on eventLoop: EventLoop? = nil - ) -> Queue { - return self.driver.makeQueue( - with: .init( - queueName: name, - configuration: self.configuration, - application: self.application, - logger: logger ?? self.application.logger, - on: eventLoop ?? self.application.eventLoopGroup.next() - ) - ) + on eventLoop: (any EventLoop)? = nil + ) -> any Queue { + self.driver.makeQueue(with: .init( + queueName: name, + configuration: self.configuration, + application: self.application, + logger: logger ?? self.application.logger, + on: eventLoop ?? self.application.eventLoopGroup.any() + )) } - /// Adds a new queued job - /// - Parameter job: The job to add - public func add(_ job: J) where J: Job { + /// Add a new queueable job. + /// + /// This must be called once for each job type that can be queued. + /// + /// - Parameter job: The job to add. + public func add(_ job: some Job) { self.configuration.add(job) } - /// Adds a new notification hook - /// - Parameter hook: The hook object to add - public func add(_ hook: N) where N: JobEventDelegate { + /// Add a new notification hook. + /// + /// - Parameter hook: The hook to add. + public func add(_ hook: some JobEventDelegate) { self.configuration.add(hook) } - /// Choose which provider to use - /// - Parameter provider: The provider + /// Choose which provider to use. + /// + /// - Parameter provider: The provider. public func use(_ provider: Provider) { provider.run(self.application) } - /// Choose which driver to use + /// Configure a driver. + /// /// - Parameter driver: The driver - public func use(custom driver: QueuesDriver) { + public func use(custom driver: any QueuesDriver) { self.storage.driver = driver } - /// Schedule a new job - /// - Parameter job: The job to schedule - public func schedule(_ job: J) -> ScheduleBuilder - where J: ScheduledJob - { - let builder = ScheduleBuilder() - _ = self.storage.configuration.schedule(job, builder: builder) - return builder + /// Schedule a new job. + /// + /// - Parameter job: The job to schedule. + public func schedule(_ job: some ScheduledJob) -> ScheduleBuilder { + self.storage.configuration.schedule(job) } - /// Starts an in-process worker to dequeue and run jobs - /// - Parameter queue: The queue to run the jobs on. Defaults to `default` + /// Starts an in-process worker to dequeue and run jobs. + /// + /// - Parameter queue: The queue to run the jobs on. Defaults to ``QueueName/default``. public func startInProcessJobs(on queue: QueueName = .default) throws { - let inProcessJobs = QueuesCommand(application: application, scheduled: false) + let inProcessJobs = QueuesCommand(application: self.application) + try inProcessJobs.startJobs(on: queue) self.storage.add(command: inProcessJobs) } - /// Starts an in-process worker to run scheduled jobs + /// Starts an in-process worker to run scheduled jobs. public func startScheduledJobs() throws { - let scheduledJobs = QueuesCommand(application: application, scheduled: true) + let scheduledJobs = QueuesCommand(application: self.application) + try scheduledJobs.startScheduledJobs() self.storage.add(command: scheduledJobs) } func initialize() { self.application.lifecycle.use(Lifecycle()) - self.application.storage[Key.self] = .init(application) + self.application.storage[Key.self] = .init(self.application) } } } diff --git a/Sources/Queues/AsyncJob.swift b/Sources/Queues/AsyncJob.swift index 197e77e..ba84375 100644 --- a/Sources/Queues/AsyncJob.swift +++ b/Sources/Queues/AsyncJob.swift @@ -4,88 +4,49 @@ import Foundation /// A task that can be queued for future execution. public protocol AsyncJob: Job { - /// The data associated with a job associatedtype Payload /// Called when it's this Job's turn to be dequeued. + /// /// - Parameters: - /// - context: The JobContext. Can be used to store and retrieve services - /// - payload: The data for this handler + /// - context: The ``QueueContext``. + /// - payload: The typed job payload. func dequeue( _ context: QueueContext, _ payload: Payload ) async throws /// Called when there is an error at any stage of the Job's execution. + /// /// - Parameters: - /// - context: The JobContext. Can be used to store and retrieve services + /// - context: The ``QueueContext``. /// - error: The error returned by the job. - /// - payload: The typed payload for the job + /// - payload: The typed job payload. func error( _ context: QueueContext, - _ error: Error, + _ error: any Error, _ payload: Payload ) async throws - - /// Called when there was an error and job will be retired. - /// - /// - Parameters: - /// - attempt: Number of job attempts which failed - /// - Returns: Number of seconds for which next retry will be delayed. - /// Return `-1` if you want to retry job immediately without putting it back to the queue. - func nextRetryIn(attempt: Int) -> Int - - static func serializePayload(_ payload: Payload) throws -> [UInt8] - static func parsePayload(_ bytes: [UInt8]) throws -> Payload } -extension AsyncJob where Payload: Codable { - - /// Serialize a payload into Data - /// - Parameter payload: The payload - public static func serializePayload(_ payload: Payload) throws -> [UInt8] { - try .init(JSONEncoder().encode(payload)) - } - - /// Parse bytes into the payload - /// - Parameter bytes: The Payload - public static func parsePayload(_ bytes: [UInt8]) throws -> Payload { - try JSONDecoder().decode(Payload.self, from: .init(bytes)) - } +extension AsyncJob { + /// Default implementation of ``AsyncJob/error(_:_:_:)-8627d``. + public func error(_ context: QueueContext, _ error: any Error, _ payload: Payload) async throws {} } extension AsyncJob { - /// The jobName of the Job - public static var name: String { - return String(describing: Self.self) - } - - /// See `Job`.`nextRetryIn` - public func nextRetryIn(attempt: Int) -> Int { - return -1 - } - - public func _nextRetryIn(attempt: Int) -> Int { - return nextRetryIn(attempt: attempt) - } - + /// Forward ``Job/dequeue(_:_:)`` to ``AsyncJob/dequeue(_:_:)-9g26t``. public func dequeue(_ context: QueueContext, _ payload: Payload) -> EventLoopFuture { - let promise = context.eventLoop.makePromise(of: Void.self) - promise.completeWithTask { + context.eventLoop.makeFutureWithTask { try await self.dequeue(context, payload) } - return promise.futureResult } - public func error(_ context: QueueContext, _ error: Error, _ payload: Payload) -> EventLoopFuture { - let promise = context.eventLoop.makePromise(of: Void.self) - promise.completeWithTask { + /// Forward ``Job/error(_:_:_:)-2brrj`` to ``AsyncJob/error(_:_:_:)-8627d`` + public func error(_ context: QueueContext, _ error: any Error, _ payload: Payload) -> EventLoopFuture { + context.eventLoop.makeFutureWithTask { try await self.error(context, error, payload) } - return promise.futureResult } - public func error(_ context: QueueContext, _ error: Error, _ payload: Payload) async throws { - return - } } diff --git a/Sources/Queues/AsyncJobEventDelegate.swift b/Sources/Queues/AsyncJobEventDelegate.swift index d76fad4..6aa1711 100644 --- a/Sources/Queues/AsyncJobEventDelegate.swift +++ b/Sources/Queues/AsyncJobEventDelegate.swift @@ -21,34 +21,34 @@ public protocol AsyncJobEventDelegate: JobEventDelegate { /// - Parameters: /// - jobId: The id of the Job /// - error: The error that caused the job to fail - func error(jobId: String, error: Error) async throws + func error(jobId: String, error: any Error) async throws } extension AsyncJobEventDelegate { public func dispatched(job: JobEventData) async throws { } public func didDequeue(jobId: String) async throws { } public func success(jobId: String) async throws { } - public func error(jobId: String, error: Error) async throws { } + public func error(jobId: String, error: any Error) async throws { } - public func dispatched(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture { + public func dispatched(job: JobEventData, eventLoop: any EventLoop) -> EventLoopFuture { eventLoop.makeFutureWithTask { try await self.dispatched(job: job) } } - public func didDequeue(jobId: String, eventLoop: EventLoop) -> EventLoopFuture { + public func didDequeue(jobId: String, eventLoop: any EventLoop) -> EventLoopFuture { eventLoop.makeFutureWithTask { try await self.didDequeue(jobId: jobId) } } - public func success(jobId: String, eventLoop: EventLoop) -> EventLoopFuture { + public func success(jobId: String, eventLoop: any EventLoop) -> EventLoopFuture { eventLoop.makeFutureWithTask { try await self.success(jobId: jobId) } } - public func error(jobId: String, error: Error, eventLoop: EventLoop) -> EventLoopFuture { + public func error(jobId: String, error: any Error, eventLoop: any EventLoop) -> EventLoopFuture { eventLoop.makeFutureWithTask { try await self.error(jobId: jobId, error: error) } diff --git a/Sources/Queues/AsyncQueue.swift b/Sources/Queues/AsyncQueue.swift new file mode 100644 index 0000000..5593408 --- /dev/null +++ b/Sources/Queues/AsyncQueue.swift @@ -0,0 +1,90 @@ +import Foundation +import Vapor +import NIOCore + +public protocol AsyncQueue: Queue { + /// The job context + var context: QueueContext { get } + + /// Gets the next job to be run + /// - Parameter id: The ID of the job + func get(_ id: JobIdentifier) async throws -> JobData + + /// Sets a job that should be run in the future + /// - Parameters: + /// - id: The ID of the job + /// - data: Data for the job + func set(_ id: JobIdentifier, to data: JobData) async throws + + /// Removes a job from the queue + /// - Parameter id: The ID of the job + func clear(_ id: JobIdentifier) async throws + + /// Pops the next job in the queue + func pop() async throws -> JobIdentifier? + + /// Pushes the next job into a queue + /// - Parameter id: The ID of the job + func push(_ id: JobIdentifier) async throws +} + +extension AsyncQueue { + public func get(_ id: JobIdentifier) -> EventLoopFuture { + self.context.eventLoop.makeFutureWithTask { try await self.get(id) } + } + + public func set(_ id: JobIdentifier, to data: JobData) -> EventLoopFuture { + self.context.eventLoop.makeFutureWithTask { try await self.set(id, to: data) } + } + + public func clear(_ id: JobIdentifier) -> EventLoopFuture { + self.context.eventLoop.makeFutureWithTask { try await self.clear(id) } + } + + public func pop() -> EventLoopFuture { + self.context.eventLoop.makeFutureWithTask { try await self.pop() } + } + + public func push(_ id: JobIdentifier) -> EventLoopFuture { + self.context.eventLoop.makeFutureWithTask { try await self.push(id) } + } +} + +extension Queue { + /// Dispatch a job into the queue for processing + /// - Parameters: + /// - job: The Job type + /// - payload: The payload data to be dispatched + /// - maxRetryCount: Number of times to retry this job on failure + /// - delayUntil: Delay the processing of this job until a certain date + public func dispatch( + _ job: J.Type, + _ payload: J.Payload, + maxRetryCount: Int = 0, + delayUntil: Date? = nil, + id: JobIdentifier = .init() + ) async throws { + var logger = self.logger + logger[metadataKey: "queue"] = "\(self.queueName.string)" + logger[metadataKey: "job-id"] = "\(id.string)" + logger[metadataKey: "job-name"] = "\(J.name)" + + let storage = JobData( + payload: try J.serializePayload(payload), + maxRetryCount: maxRetryCount, + jobName: J.name, + delayUntil: delayUntil, + queuedAt: .init() + ) + + logger.trace("Storing job data") + try await self.set(id, to: storage).get() + logger.trace("Pusing job to queue") + try await self.push(id).get() + logger.info("Dispatched job") + + await self.sendNotification(of: "dispatch", logger: logger) { + try await $0.dispatched(job: .init(id: id.string, queueName: self.queueName.string, jobData: storage), eventLoop: self.eventLoop).get() + } + } +} diff --git a/Sources/Queues/AsyncScheduledJob.swift b/Sources/Queues/AsyncScheduledJob.swift index 14af89f..1041eaa 100644 --- a/Sources/Queues/AsyncScheduledJob.swift +++ b/Sources/Queues/AsyncScheduledJob.swift @@ -15,10 +15,8 @@ extension AsyncScheduledJob { public var name: String { "\(Self.self)" } public func run(context: QueueContext) -> EventLoopFuture { - let promise = context.eventLoop.makePromise(of: Void.self) - promise.completeWithTask { + context.eventLoop.makeFutureWithTask { try await self.run(context: context) } - return promise.futureResult } } diff --git a/Sources/Queues/Docs.docc/Resources/vapor-queues-logo.svg b/Sources/Queues/Docs.docc/Resources/vapor-queues-logo.svg new file mode 100644 index 0000000..f40f693 --- /dev/null +++ b/Sources/Queues/Docs.docc/Resources/vapor-queues-logo.svg @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + + + + + diff --git a/Sources/Queues/Docs.docc/theme-settings.json b/Sources/Queues/Docs.docc/theme-settings.json new file mode 100644 index 0000000..1e0af2e --- /dev/null +++ b/Sources/Queues/Docs.docc/theme-settings.json @@ -0,0 +1,21 @@ +{ + "theme": { + "aside": { "border-radius": "16px", "border-style": "double", "border-width": "3px" }, + "border-radius": "0", + "button": { "border-radius": "16px", "border-width": "1px", "border-style": "solid" }, + "code": { "border-radius": "16px", "border-width": "1px", "border-style": "solid" }, + "color": { + "queues": "#e8665a", + "documentation-intro-fill": "radial-gradient(circle at top, var(--color-queues) 30%, #000 100%)", + "documentation-intro-accent": "var(--color-queues)", + "logo-base": { "dark": "#fff", "light": "#000" }, + "logo-shape": { "dark": "#000", "light": "#fff" }, + "fill": { "dark": "#000", "light": "#fff" } + }, + "icons": { "technology": "/queues/images/vapor-queues-logo.svg" } + }, + "features": { + "quickNavigation": { "enable": true }, + "i18n": { "enable": true } + } +} diff --git a/Sources/Queues/Exports.swift b/Sources/Queues/Exports.swift index 3d38063..1008b89 100644 --- a/Sources/Queues/Exports.swift +++ b/Sources/Queues/Exports.swift @@ -1,19 +1,6 @@ -#if swift(>=5.8) - @_documentation(visibility: internal) @_exported import struct Foundation.Date @_documentation(visibility: internal) @_exported import struct Logging.Logger -@_documentation(visibility: internal) @_exported import class NIO.EventLoopFuture -@_documentation(visibility: internal) @_exported import struct NIO.EventLoopPromise -@_documentation(visibility: internal) @_exported import protocol NIO.EventLoop -@_documentation(visibility: internal) @_exported import struct NIO.TimeAmount - -#else - -@_exported import struct Foundation.Date -@_exported import struct Logging.Logger -@_exported import class NIO.EventLoopFuture -@_exported import struct NIO.EventLoopPromise -@_exported import protocol NIO.EventLoop -@_exported import struct NIO.TimeAmount - -#endif +@_documentation(visibility: internal) @_exported import class NIOCore.EventLoopFuture +@_documentation(visibility: internal) @_exported import struct NIOCore.EventLoopPromise +@_documentation(visibility: internal) @_exported import protocol NIOCore.EventLoop +@_documentation(visibility: internal) @_exported import struct NIOCore.TimeAmount diff --git a/Sources/Queues/Job.swift b/Sources/Queues/Job.swift index 80dfc68..7cd2937 100644 --- a/Sources/Queues/Job.swift +++ b/Sources/Queues/Job.swift @@ -6,105 +6,129 @@ import Vapor /// A task that can be queued for future execution. public protocol Job: AnyJob { /// The data associated with a job - associatedtype Payload + associatedtype Payload: Sendable /// Called when it's this Job's turn to be dequeued. + /// /// - Parameters: - /// - context: The JobContext. Can be used to store and retrieve services - /// - payload: The data for this handler + /// - context: The ``QueueContext``. + /// - payload: The typed job payload. func dequeue( _ context: QueueContext, _ payload: Payload ) -> EventLoopFuture /// Called when there is an error at any stage of the Job's execution. + /// /// - Parameters: - /// - context: The JobContext. Can be used to store and retrieve services + /// - context: The ``QueueContext``. /// - error: The error returned by the job. - /// - payload: The typed payload for the job + /// - payload: The typed job payload. func error( _ context: QueueContext, - _ error: Error, + _ error: any Error, _ payload: Payload ) -> EventLoopFuture - /// Called when there was an error and job will be retired. + /// Called when there was an error and the job will be retried. /// - /// - Parameters: - /// - attempt: Number of job attempts which failed - /// - Returns: Number of seconds for which next retry will be delayed. - /// Return `-1` if you want to retry job immediately without putting it back to the queue. + /// - Parameter attempt: Number of job attempts which have failed so far. + /// - Returns: Number of seconds to delay the next retry. Return `0` to place the job back on the queue with no + /// delay added. If this method is not implemented, the default is `0`. Returning `-1` is the same as `0`. func nextRetryIn(attempt: Int) -> Int + /// Serialize a typed payload to an array of bytes. When `Payload` is `Codable`, this method will default to + /// encoding to JSON. + /// + /// - Parameter payload: The payload to serialize. + /// - Returns: The array of serialized bytes. static func serializePayload(_ payload: Payload) throws -> [UInt8] + + /// Deserialize an array of bytes into a typed payload. When `Payload` is `Codable`, this method will default to + /// decoding from JSON. + /// + /// - Parameter bytes: The serialized bytes to decode. + /// - Returns: A decoded payload. static func parsePayload(_ bytes: [UInt8]) throws -> Payload } -extension Job where Payload: Codable { - - /// Serialize a payload into Data - /// - Parameter payload: The payload +extension Job where Payload: Codable { + /// Default implementation for ``Job/serializePayload(_:)-4uro2``. public static func serializePayload(_ payload: Payload) throws -> [UInt8] { try .init(JSONEncoder().encode(payload)) } - /// Parse bytes into the payload - /// - Parameter bytes: The Payload + /// Default implementation for ``Job/parsePayload(_:)-9tn3a``. public static func parsePayload(_ bytes: [UInt8]) throws -> Payload { try JSONDecoder().decode(Payload.self, from: .init(bytes)) } } extension Job { - /// The jobName of the Job + /// Default implementation for ``AnyJob/name``. public static var name: String { - return String(describing: Self.self) + String(describing: Self.self) } - /// See `Job`.`error` + /// Default implementation for ``Job/error(_:_:_:)-jzgw``. public func error( _ context: QueueContext, - _ error: Error, + _ error: any Error, _ payload: Payload ) -> EventLoopFuture { - context.eventLoop.makeSucceededFuture(()) + context.eventLoop.makeSucceededVoidFuture() } - /// See `Job`.`nextRetryIn` + /// Default implementation for ``Job/nextRetryIn(attempt:)-5gc93``. public func nextRetryIn(attempt: Int) -> Int { - return -1 + 0 } +} +/// A type-erased version of ``Job``. +public protocol AnyJob: Sendable { + /// The name of the job. + static var name: String { get } + + /// Perform ``Job/dequeue(_:_:)`` after deserializing the raw payload bytes. + func _dequeue(_ context: QueueContext, id: String, payload: [UInt8]) -> EventLoopFuture + + /// Perform ``Job/error(_:_:_:)-2brrj`` after deserializing the raw payload bytes. + func _error(_ context: QueueContext, id: String, _ error: any Error, payload: [UInt8]) -> EventLoopFuture + + /// Type-erased accessor for ``Job/nextRetryIn(attempt:)-5gc93``. + func _nextRetryIn(attempt: Int) -> Int +} + +// N.B. These should really not be public. +extension Job { + // See `AnyJob._nextRetryIn(attempt:)`. public func _nextRetryIn(attempt: Int) -> Int { - return nextRetryIn(attempt: attempt) + self.nextRetryIn(attempt: attempt) } - public func _error(_ context: QueueContext, id: String, _ error: Error, payload: [UInt8]) -> EventLoopFuture { - var contextCopy = context - contextCopy.logger[metadataKey: "job_id"] = .string(id) + // See `AnyJob._error(_:id:_:payload:)`. + public func _error(_ context: QueueContext, id: String, _ error: any Error, payload: [UInt8]) -> EventLoopFuture { + var context = context + context.logger[metadataKey: "queue"] = "\(context.queueName.string)" + context.logger[metadataKey: "job_id"] = "\(id)" do { - return try self.error(contextCopy, error, Self.parsePayload(payload)) + return try self.error(context, error, Self.parsePayload(payload)) } catch { return context.eventLoop.makeFailedFuture(error) } } + // See `AnyJob._dequeue(_:id:payload:)`. public func _dequeue(_ context: QueueContext, id: String, payload: [UInt8]) -> EventLoopFuture { - var contextCopy = context - contextCopy.logger[metadataKey: "job_id"] = .string(id) + var context = context + context.logger[metadataKey: "queue"] = "\(context.queueName.string)" + context.logger[metadataKey: "job_id"] = "\(id)" do { - return try self.dequeue(contextCopy, Self.parsePayload(payload)) + return try self.dequeue(context, Self.parsePayload(payload)) } catch { return context.eventLoop.makeFailedFuture(error) } } } -/// A type-erased version of `Job` -public protocol AnyJob { - /// The name of the `Job` - static var name: String { get } - func _dequeue(_ context: QueueContext, id: String, payload: [UInt8]) -> EventLoopFuture - func _error(_ context: QueueContext, id: String, _ error: Error, payload: [UInt8]) -> EventLoopFuture - func _nextRetryIn(attempt: Int) -> Int -} diff --git a/Sources/Queues/JobData.swift b/Sources/Queues/JobData.swift index ba228dd..ab32c33 100644 --- a/Sources/Queues/JobData.swift +++ b/Sources/Queues/JobData.swift @@ -1,7 +1,7 @@ import Foundation /// Holds information about the Job that is to be encoded to the persistence store. -public struct JobData: Codable { +public struct JobData: Codable, Sendable { /// The job data to be encoded. public let payload: [UInt8] @@ -29,6 +29,9 @@ public struct JobData: Codable { queuedAt: Date, attempts: Int = 0 ) { + assert(maxRetryCount >= 0) + assert(attempts >= 0) + self.payload = payload self.maxRetryCount = maxRetryCount self.jobName = jobName @@ -37,3 +40,34 @@ public struct JobData: Codable { self.attempts = attempts } } + +// N.B.: These methods are intended for internal use only. +extension JobData { + /// The non-`nil` number of attempts made to run this job (how many times has it failed). + /// This can also be treated as a "retry" count. + /// + /// Value | Meaning + /// -|- + /// 0 | The job has never run, or succeeded on its first attempt + /// 1 | The job has failed once and is queued for its first retry + /// 2... | The job has failed N times and is queued for its Nth retry + var failureCount: Int { + self.attempts ?? 0 + } + + /// The number of retries left iff the current (re)try fails. + var remainingAttempts: Int { + Swift.max(0, self.maxRetryCount - self.failureCount) + } + + /// The current attempt number. + /// + /// Value|Meaning + /// -|- + /// 0|Not valid + /// 1|The job has not failed thus far; this the first attempt. + /// 2|The job has failed once; this is the second attempt. + var currentAttempt: Int { + self.failureCount + 1 + } +} diff --git a/Sources/Queues/JobIdentifier.swift b/Sources/Queues/JobIdentifier.swift index c811d14..3739220 100644 --- a/Sources/Queues/JobIdentifier.swift +++ b/Sources/Queues/JobIdentifier.swift @@ -1,8 +1,7 @@ import struct Foundation.UUID /// An identifier for a job -public struct JobIdentifier: Hashable, Equatable { - +public struct JobIdentifier: Hashable, Equatable, Sendable { /// The string value of the ID public let string: String diff --git a/Sources/Queues/NotificationHook.swift b/Sources/Queues/NotificationHook.swift index ea317d9..54724e6 100644 --- a/Sources/Queues/NotificationHook.swift +++ b/Sources/Queues/NotificationHook.swift @@ -2,55 +2,54 @@ import NIOCore import Foundation /// Represents an object that can receive notifications about job statuses -public protocol JobEventDelegate { - +public protocol JobEventDelegate: Sendable { /// Called when the job is first dispatched /// - Parameters: /// - job: The `JobData` associated with the job /// - eventLoop: The eventLoop - func dispatched(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture + func dispatched(job: JobEventData, eventLoop: any EventLoop) -> EventLoopFuture /// Called when the job is dequeued /// - Parameters: /// - jobId: The id of the Job /// - eventLoop: The eventLoop - func didDequeue(jobId: String, eventLoop: EventLoop) -> EventLoopFuture + func didDequeue(jobId: String, eventLoop: any EventLoop) -> EventLoopFuture /// Called when the job succeeds /// - Parameters: /// - jobId: The id of the Job /// - eventLoop: The eventLoop - func success(jobId: String, eventLoop: EventLoop) -> EventLoopFuture + func success(jobId: String, eventLoop: any EventLoop) -> EventLoopFuture /// Called when the job returns an error /// - Parameters: /// - jobId: The id of the Job /// - error: The error that caused the job to fail /// - eventLoop: The eventLoop - func error(jobId: String, error: Error, eventLoop: EventLoop) -> EventLoopFuture + func error(jobId: String, error: any Error, eventLoop: any EventLoop) -> EventLoopFuture } extension JobEventDelegate { - public func dispatched(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture { - eventLoop.future() + public func dispatched(job: JobEventData, eventLoop: any EventLoop) -> EventLoopFuture { + eventLoop.makeSucceededVoidFuture() } - public func didDequeue(jobId: String, eventLoop: EventLoop) -> EventLoopFuture { - eventLoop.future() + public func didDequeue(jobId: String, eventLoop: any EventLoop) -> EventLoopFuture { + eventLoop.makeSucceededVoidFuture() } - public func success(jobId: String, eventLoop: EventLoop) -> EventLoopFuture { - eventLoop.future() + public func success(jobId: String, eventLoop: any EventLoop) -> EventLoopFuture { + eventLoop.makeSucceededVoidFuture() } - public func error(jobId: String, error: Error, eventLoop: EventLoop) -> EventLoopFuture { - eventLoop.future() + public func error(jobId: String, error: any Error, eventLoop: any EventLoop) -> EventLoopFuture { + eventLoop.makeSucceededVoidFuture() } } /// Data on a job sent via a notification -public struct JobEventData { +public struct JobEventData: Sendable { /// The id of the job, assigned at dispatch public var id: String diff --git a/Sources/Queues/Queue+Async.swift b/Sources/Queues/Queue+Async.swift deleted file mode 100644 index 71f1775..0000000 --- a/Sources/Queues/Queue+Async.swift +++ /dev/null @@ -1,21 +0,0 @@ -import Foundation -import Vapor -import NIOCore - -extension Queue { - /// Dispatch a job into the queue for processing - /// - Parameters: - /// - job: The Job type - /// - payload: The payload data to be dispatched - /// - maxRetryCount: Number of times to retry this job on failure - /// - delayUntil: Delay the processing of this job until a certain date - public func dispatch( - _ job: J.Type, - _ payload: J.Payload, - maxRetryCount: Int = 0, - delayUntil: Date? = nil, - id: JobIdentifier = JobIdentifier() - ) async throws where J: Job { - try await self.dispatch(job, payload, maxRetryCount: maxRetryCount, delayUntil: delayUntil, id: id).get() - } -} diff --git a/Sources/Queues/Queue.swift b/Sources/Queues/Queue.swift index cfef68f..efee13e 100644 --- a/Sources/Queues/Queue.swift +++ b/Sources/Queues/Queue.swift @@ -3,7 +3,7 @@ import Logging import Foundation /// A type that can store and retrieve jobs from a persistence layer -public protocol Queue { +public protocol Queue: Sendable { /// The job context var context: QueueContext { get } @@ -31,7 +31,7 @@ public protocol Queue { extension Queue { /// The EventLoop for a job queue - public var eventLoop: EventLoop { + public var eventLoop: any EventLoop { self.context.eventLoop } @@ -61,22 +61,26 @@ extension Queue { /// - payload: The payload data to be dispatched /// - maxRetryCount: Number of times to retry this job on failure /// - delayUntil: Delay the processing of this job until a certain date - public func dispatch( + public func dispatch( _ job: J.Type, _ payload: J.Payload, maxRetryCount: Int = 0, delayUntil: Date? = nil, - id: JobIdentifier = JobIdentifier() - ) -> EventLoopFuture - where J: Job - { + id: JobIdentifier = .init() + ) -> EventLoopFuture { + var logger_ = self.logger + logger_[metadataKey: "queue"] = "\(self.queueName.string)" + logger_[metadataKey: "job-id"] = "\(id.string)" + logger_[metadataKey: "job-name"] = "\(J.name)" + let logger = logger_ + let bytes: [UInt8] do { bytes = try J.serializePayload(payload) } catch { return self.eventLoop.makeFailedFuture(error) } - logger.trace("Serialized bytes for payload: \(bytes)") + let storage = JobData( payload: bytes, maxRetryCount: maxRetryCount, @@ -84,21 +88,44 @@ extension Queue { delayUntil: delayUntil, queuedAt: Date() ) - logger.trace("Adding the ID to the storage") + + logger.trace("Storing job data") return self.set(id, to: storage).flatMap { - self.push(id) - }.flatMap { _ in - self.logger.info("Dispatched queue job", metadata: [ - "job_id": .string(id.string), - "job_name": .string(job.name), - "queue": .string(self.queueName.string) - ]) + logger.trace("Pusing job to queue") + return self.push(id) + }.flatMapWithEventLoop { _, eventLoop in + logger.info("Dispatched job") + return self.sendNotification(of: "dispatch", logger: logger) { + $0.dispatched(job: .init(id: id.string, queueName: self.queueName.string, jobData: storage), eventLoop: eventLoop) + } + } + } +} + +extension Queue { + func sendNotification( + of kind: String, logger: Logger, + _ notification: @escaping @Sendable (_ hook: any JobEventDelegate) -> EventLoopFuture + ) -> EventLoopFuture { + logger.trace("Sending notification", metadata: ["kind": "\(kind)"]) + return self.configuration.notificationHooks.map { + notification($0).flatMapErrorWithEventLoop { error, eventLoop in + logger.warning("Failed to send notification", metadata: ["kind": "\(kind)", "error": "\(error)"]) + return eventLoop.makeSucceededVoidFuture() + } + }.flatten(on: self.eventLoop) + } - return self.configuration.notificationHooks.map { - $0.dispatched(job: .init(id: id.string, queueName: self.queueName.string, jobData: storage), eventLoop: self.eventLoop) - }.flatten(on: self.eventLoop).flatMapError { error in - self.logger.error("Could not send dispatched notification: \(error)") - return self.eventLoop.future() + func sendNotification( + of kind: String, logger: Logger, + _ notification: @escaping @Sendable (_ hook: any JobEventDelegate) async throws -> Void + ) async { + logger.trace("Sending notification", metadata: ["kind": "\(kind)"]) + for hook in self.configuration.notificationHooks { + do { + try await notification(hook) + } catch { + logger.warning("Failed to send notification", metadata: ["kind": "\(kind)", "error": "\(error)"]) } } } diff --git a/Sources/Queues/QueueContext.swift b/Sources/Queues/QueueContext.swift index 6ef924e..bad1db9 100644 --- a/Sources/Queues/QueueContext.swift +++ b/Sources/Queues/QueueContext.swift @@ -3,7 +3,7 @@ import NIOCore import Vapor /// The context for a queue. -public struct QueueContext { +public struct QueueContext: Sendable { /// The name of the queue public let queueName: QueueName @@ -17,7 +17,7 @@ public struct QueueContext { public var logger: Logger /// An event loop to run the process on - public let eventLoop: EventLoop + public let eventLoop: any EventLoop /// Creates a new JobContext /// - Parameters: @@ -31,7 +31,7 @@ public struct QueueContext { configuration: QueuesConfiguration, application: Application, logger: Logger, - on eventLoop: EventLoop + on eventLoop: any EventLoop ) { self.queueName = queueName self.configuration = configuration @@ -41,13 +41,13 @@ public struct QueueContext { } /// Returns the default job `Queue` - public var queue: Queue { + public var queue: any Queue { self.queues(.default) } /// Returns the specific job `Queue` for the given queue name /// - Parameter queue: The queue name - public func queues(_ queue: QueueName) -> Queue { + public func queues(_ queue: QueueName) -> any Queue { self.application.queues.queue( queue, logger: self.logger, diff --git a/Sources/Queues/QueueName.swift b/Sources/Queues/QueueName.swift index a7bfe8b..1941666 100644 --- a/Sources/Queues/QueueName.swift +++ b/Sources/Queues/QueueName.swift @@ -1,23 +1,23 @@ /// A specific queue that jobs are run on. -public struct QueueName { +public struct QueueName: Sendable { /// The default queue that jobs are run on - public static let `default` = QueueName(string: "default") + public static let `default` = Self(string: "default") /// The name of the queue public let string: String - /// Creates a new `QueueType` + /// Creates a new ``QueueName`` /// - /// - Parameter name: The name of the `QueueType` + /// - Parameter name: The name of the queue public init(string: String) { self.string = string } /// Makes the name of the queue /// - /// - Parameter persistanceKey: The base persistence key + /// - Parameter persistenceKey: The base persistence key /// - Returns: A string of the queue's fully qualified name - public func makeKey(with persistanceKey: String) -> String { - return persistanceKey + "[\(self.string)]" + public func makeKey(with persistenceKey: String) -> String { + "\(persistenceKey)[\(self.string)]" } } diff --git a/Sources/Queues/QueueWorker.swift b/Sources/Queues/QueueWorker.swift index b375188..fe76246 100644 --- a/Sources/Queues/QueueWorker.swift +++ b/Sources/Queues/QueueWorker.swift @@ -8,182 +8,94 @@ extension Queue { } } -/// The worker that runs the `Job` -public struct QueueWorker { - let queue: Queue +/// The worker that runs ``Job``s. +public struct QueueWorker: Sendable { + let queue: any Queue - init(queue: Queue) { - self.queue = queue + /// Actually run the queue. This is a thin wrapper for ELF-style callers. + public func run() -> EventLoopFuture { + self.queue.eventLoop.makeFutureWithTask { + try await self.run() + } } - /// Logic to run the queue - public func run() -> EventLoopFuture { - queue.logger.trace("Popping job from queue") - return self.queue.pop().flatMap { id in - //No job found, go to the next iteration - guard let id = id else { - self.queue.logger.trace("Did not receive ID from pop") - return self.queue.eventLoop.makeSucceededFuture(()) - } - - self.queue.logger.trace("Received job \(id)") - self.queue.logger.trace("Getting data for job \(id)") - - return self.queue.get(id).flatMap { data in - var logger = self.queue.logger - logger[metadataKey: "job_id"] = .string(id.string) - - logger.trace("Received job data for \(id): \(data)") - // If the job has a delay, we must check to make sure we can execute. - // If the delay has not passed yet, requeue the job - if let delay = data.delayUntil, delay >= Date() { - logger.trace("Requeueing job \(id) for execution later because the delayUntil value of \(delay) has not passed yet") - return self.queue.push(id) - } + /// Pop a job off the queue and try to run it. If no jobs are available, do nothing. + public func run() async throws { + var logger = self.queue.logger + logger[metadataKey: "queue"] = "\(self.queue.queueName.string)" + logger.trace("Popping job from queue") + + guard let id = try await self.queue.pop().get() else { + // No job found, go around again. + return logger.trace("No pending jobs") + } - guard let job = self.queue.configuration.jobs[data.jobName] else { - logger.error("No job named \(data.jobName) is registered") - return self.queue.eventLoop.makeSucceededFuture(()) - } + logger[metadataKey: "job-id"] = "\(id.string)" + logger.trace("Found pending job") + + let data = try await self.queue.get(id).get() + logger.trace("Received job data", metadata: ["job-data": "\(data)"]) + logger[metadataKey: "job-name"] = "\(data.jobName)" - logger.trace("Sending dequeued notification hooks") + guard let job = self.queue.configuration.jobs[data.jobName] else { + logger.warning("No job with the desired name is registered, discarding") + return try await self.queue.clear(id).get() + } - return self.queue.configuration.notificationHooks.map { - $0.didDequeue(jobId: id.string, eventLoop: self.queue.eventLoop) - }.flatten(on: self.queue.eventLoop).flatMapError { error in - logger.error("Could not send didDequeue notification: \(error)") - return self.queue.eventLoop.future() - }.flatMap { _ in - logger.info("Dequeueing job", metadata: [ - "job_id": .string(id.string), - "job_name": .string(data.jobName), - "queue": .string(self.queue.queueName.string) - ]) + // If the job has a delay that isn't up yet, requeue it. + guard (data.delayUntil ?? .distantPast) < Date() else { + logger.trace("Job is delayed, requeueing for later execution", metadata: ["delayed-until": "\(data.delayUntil ?? .distantPast)"]) + return try await self.queue.push(id).get() + } - return self.run( - id: id, - name: data.jobName, - job: job, - payload: data.payload, - logger: logger, - remainingTries: data.maxRetryCount, - attempts: data.attempts, - jobData: data - ) - } - } + await self.queue.sendNotification(of: "dequeue", logger: logger) { + try await $0.didDequeue(jobId: id.string, eventLoop: self.queue.eventLoop).get() } + + try await self.run(id: id, job: job, jobData: data, logger: logger) } - private func run( - id: JobIdentifier, - name: String, - job: AnyJob, - payload: [UInt8], - logger: Logger, - remainingTries: Int, - attempts: Int?, - jobData: JobData - ) -> EventLoopFuture { - logger.trace("Running the queue job (remaining tries: \(remainingTries)") - let futureJob = job._dequeue(self.queue.context, id: id.string, payload: payload) - return futureJob.flatMap { complete in - logger.trace("Ran job successfully") - logger.trace("Sending success notification hooks") - return self.queue.configuration.notificationHooks.map { - $0.success(jobId: id.string, eventLoop: self.queue.context.eventLoop) - }.flatten(on: self.queue.context.eventLoop).flatMapError { error in - self.queue.logger.error("Could not send success notification: \(error)") - return self.queue.context.eventLoop.future() - }.flatMap { - logger.trace("Job done being run") - return self.queue.clear(id) + private func run(id: JobIdentifier, job: any AnyJob, jobData: JobData, logger: Logger) async throws { + logger.info("Dequeing and running job", metadata: ["attempt": "\(jobData.currentAttempt)", "retries-left": "\(jobData.remainingAttempts)"]) + do { + try await job._dequeue(self.queue.context, id: id.string, payload: jobData.payload).get() + + logger.trace("Job ran successfully", metadata: ["attempts-made": "\(jobData.currentAttempt)"]) + await self.queue.sendNotification(of: "success", logger: logger) { + try await $0.success(jobId: id.string, eventLoop: self.queue.context.eventLoop).get() } - }.flatMapError { error in - logger.trace("Job failed (remaining tries: \(remainingTries)") - if remainingTries == 0 { - logger.error("Job failed with error: \(error)", metadata: [ - "job_id": .string(id.string), - "job_name": .string(name), - "queue": .string(self.queue.queueName.string) - ]) + } catch { + if jobData.remainingAttempts > 0 { + // N.B.: `return` from here so we don't clear the job data. + return try await self.retry(id: id, job: job, jobData: jobData, error: error, logger: logger) + } else { + logger.warning("Job failed, no retries remaining", metadata: ["error": "\(error)", "attempts-made": "\(jobData.currentAttempt)"]) - logger.trace("Sending failure notification hooks") - return job._error(self.queue.context, id: id.string, error, payload: payload).flatMap { _ in - return self.queue.configuration.notificationHooks.map { - $0.error(jobId: id.string, error: error, eventLoop: self.queue.context.eventLoop) - }.flatten(on: self.queue.context.eventLoop).flatMapError { error in - self.queue.logger.error("Failed to send error notification: \(error)") - return self.queue.context.eventLoop.future() - }.flatMap { - logger.trace("Job done being run") - return self.queue.clear(id) - } + try await job._error(self.queue.context, id: id.string, error, payload: jobData.payload).get() + await self.queue.sendNotification(of: "failure", logger: logger) { + try await $0.error(jobId: id.string, error: error, eventLoop: self.queue.context.eventLoop).get() } - } else { - return self.retry( - id: id, - name: name, - job: job, - payload: payload, - logger: logger, - remainingTries: remainingTries, - attempts: attempts, - jobData: jobData, - error: error - ) } } + try await self.queue.clear(id).get() } - private func retry( - id: JobIdentifier, - name: String, - job: AnyJob, - payload: [UInt8], - logger: Logger, - remainingTries: Int, - attempts: Int?, - jobData: JobData, - error: Error - ) -> EventLoopFuture { - let attempts = attempts ?? 0 - let delayInSeconds = job._nextRetryIn(attempt: attempts + 1) - if delayInSeconds == -1 { - logger.error("Job failed, retrying... \(error)", metadata: [ - "job_id": .string(id.string), - "job_name": .string(name), - "queue": .string(self.queue.queueName.string) - ]) - return self.run( - id: id, - name: name, - job: job, - payload: payload, - logger: logger, - remainingTries: remainingTries - 1, - attempts: attempts + 1, - jobData: jobData - ) - } else { - logger.error("Job failed, retrying in \(delayInSeconds)s... \(error)", metadata: [ - "job_id": .string(id.string), - "job_name": .string(name), - "queue": .string(self.queue.queueName.string) - ]) - let storage = JobData( - payload: jobData.payload, - maxRetryCount: remainingTries - 1, - jobName: jobData.jobName, - delayUntil: Date(timeIntervalSinceNow: Double(delayInSeconds)), - queuedAt: jobData.queuedAt, - attempts: attempts + 1 - ) - return self.queue.clear(id).flatMap { - self.queue.set(id, to: storage) - }.flatMap { - self.queue.push(id) - } - } + private func retry(id: JobIdentifier, job: any AnyJob, jobData: JobData, error: any Error, logger: Logger) async throws { + let delay = Swift.max(0, job._nextRetryIn(attempt: jobData.currentAttempt)) + let updatedData = JobData( + payload: jobData.payload, + maxRetryCount: jobData.maxRetryCount, + jobName: jobData.jobName, + delayUntil: .init(timeIntervalSinceNow: Double(delay)), + queuedAt: .init(), + attempts: jobData.currentAttempt + ) + + logger.warning("Job failed, retrying", metadata: [ + "retry-delay": "\(delay)", "error": "\(error)", "next-attempt": "\(updatedData.currentAttempt)", "retries-left": "\(updatedData.remainingAttempts)" + ]) + try await self.queue.clear(id).get() + try await self.queue.set(id, to: updatedData).get() + try await self.queue.push(id).get() } } diff --git a/Sources/Queues/QueuesCommand.swift b/Sources/Queues/QueuesCommand.swift index cd2cada..a242d35 100644 --- a/Sources/Queues/QueuesCommand.swift +++ b/Sources/Queues/QueuesCommand.swift @@ -1,23 +1,18 @@ import ConsoleKit -import Dispatch +@preconcurrency import Dispatch import Vapor import NIOConcurrencyHelpers import NIOCore import Atomics -#if os(Linux) -import Glibc -#else -import Darwin.C -#endif /// The command to start the Queue job -public final class QueuesCommand: Command { - /// See `Command.signature` +public final class QueuesCommand: AsyncCommand, Sendable { + // See `Command.signature`. public let signature = Signature() - /// See `Command.Signature` + // See `Command.Signature`. public struct Signature: CommandSignature { - public init() { } + public init() {} @Option(name: "queue", help: "Specifies a single queue to run") var queue: String? @@ -26,57 +21,51 @@ public final class QueuesCommand: Command { var scheduled: Bool } - /// See `Command.help` - public var help: String { - return "Starts the Vapor Queues worker" - } + // See `Command.help`. + public var help: String { "Starts the Vapor Queues worker" } private let application: Application - private var jobTasks: [RepeatedTask] - private var scheduledTasks: [String: AnyScheduledJob.Task] - private var lock: NIOLock - private var signalSources: [DispatchSourceSignal] - private var didShutdown: Bool - private let isShuttingDown: ManagedAtomic + private let box: NIOLockedValueBox - private var eventLoopGroup: EventLoopGroup { - self.application.eventLoopGroup + struct Box: Sendable { + var jobTasks: [RepeatedTask] + var scheduledTasks: [String: AnyScheduledJob.Task] + var signalSources: [any DispatchSourceSignal] + var didShutdown: Bool } - - /// Create a new `QueueCommand` + + /// Create a new ``QueuesCommand``. + /// + /// - Parameters: + /// - application: The active Vapor `Application`. + /// - scheduled: This parameter is a historical artifact and has no effect. public init(application: Application, scheduled: Bool = false) { self.application = application - self.jobTasks = [] - self.scheduledTasks = [:] - self.isShuttingDown = .init(false) - self.signalSources = [] - self.didShutdown = false - self.lock = .init() + self.box = .init(.init(jobTasks: [], scheduledTasks: [:], signalSources: [], didShutdown: false)) } - /// Runs the command - /// - Parameters: - /// - context: A `CommandContext` for the command to run on - /// - signature: The signature of the command - public func run(using context: CommandContext, signature: QueuesCommand.Signature) throws { - self.application.logger.trace("Begginning the run function") - + // See `AsyncCommand.run(using:signature:)`. + public func run(using context: CommandContext, signature: QueuesCommand.Signature) async throws { // shutdown future - let promise = self.application.eventLoopGroup.next().makePromise(of: Void.self) + let promise = self.application.eventLoopGroup.any().makePromise(of: Void.self) self.application.running = .start(using: promise) // setup signal sources for shutdown let signalQueue = DispatchQueue(label: "codes.vapor.jobs.command") func makeSignalSource(_ code: Int32) { + #if canImport(Darwin) + /// https://github.com/swift-server/swift-service-lifecycle/blob/main/Sources/UnixSignals/UnixSignalsSequence.swift#L77-L82 + signal(code, SIG_IGN) + #endif + let source = DispatchSource.makeSignalSource(signal: code, queue: signalQueue) source.setEventHandler { print() // clear ^C promise.succeed(()) } source.resume() - self.signalSources.append(source) - signal(code, SIG_IGN) + self.box.withLockedValue { $0.signalSources.append(source) } } makeSignalSource(SIGTERM) makeSignalSource(SIGINT) @@ -85,55 +74,53 @@ public final class QueuesCommand: Command { self.application.logger.info("Starting scheduled jobs worker") try self.startScheduledJobs() } else { - let queue: QueueName = signature.queue - .flatMap { .init(string: $0) } ?? .default + let queue: QueueName = signature.queue.map { .init(string: $0) } ?? .default + self.application.logger.info("Starting jobs worker", metadata: ["queue": .string(queue.string)]) try self.startJobs(on: queue) } } /// Starts an in-process jobs worker for queued tasks + /// /// - Parameter queueName: The queue to run the jobs on public func startJobs(on queueName: QueueName) throws { let workerCount: Int switch self.application.queues.configuration.workerCount { case .default: - var count = 0 - for _ in self.eventLoopGroup.makeIterator() { - count += 1 - } - workerCount = count - self.application.logger.trace("Default workerCount, setting to \(workerCount)") + workerCount = self.application.eventLoopGroup.makeIterator().reduce(0, { n, _ in n + 1 }) + self.application.logger.trace("Using default worker count", metadata: ["workerCount": "\(workerCount)"]) case .custom(let custom): workerCount = custom - self.application.logger.trace("Custom workerCount, setting to \(workerCount)") + self.application.logger.trace("Using custom worker count", metadata: ["workerCount": "\(workerCount)"]) } - for i in 0.. = .init(.init()) + /// The number of seconds to wait before checking for the next job. Defaults to `1` - public var refreshInterval: TimeAmount + public var refreshInterval: TimeAmount { + get { self.dataBox.withLockedValue { $0.refreshInterval } } + set { self.dataBox.withLockedValue { $0.refreshInterval = newValue } } + } /// The key that stores the data about a job. Defaults to `vapor_queues` - public var persistenceKey: String + public var persistenceKey: String { + get { self.dataBox.withLockedValue { $0.persistenceKey } } + set { self.dataBox.withLockedValue { $0.persistenceKey = newValue } } + } /// Supported options for number of job handling workers. - public enum WorkerCount: ExpressibleByIntegerLiteral { + public enum WorkerCount: ExpressibleByIntegerLiteral, Sendable { /// One worker per event loop. case `default` @@ -24,50 +45,61 @@ public struct QueuesConfiguration { } /// Sets the number of workers used for handling jobs. - public var workerCount: WorkerCount + public var workerCount: WorkerCount { + get { self.dataBox.withLockedValue { $0.workerCount } } + set { self.dataBox.withLockedValue { $0.workerCount = newValue } } + } /// A logger public let logger: Logger // Arbitrary user info to be stored - public var userInfo: [AnyHashable: Any] + public var userInfo: [AnySendableHashable: any Sendable] { + get { self.dataBox.withLockedValue { $0.userInfo } } + set { self.dataBox.withLockedValue { $0.userInfo = newValue } } + } - var jobs: [String: AnyJob] - var scheduledJobs: [AnyScheduledJob] - var notificationHooks: [JobEventDelegate] + var jobs: [String: any AnyJob] { + get { self.dataBox.withLockedValue { $0.jobs } } + set { self.dataBox.withLockedValue { $0.jobs = newValue } } + } - /// Creates an empty `JobsConfig` + var scheduledJobs: [AnyScheduledJob] { + get { self.dataBox.withLockedValue { $0.scheduledJobs } } + set { self.dataBox.withLockedValue { $0.scheduledJobs = newValue } } + } + + var notificationHooks: [any JobEventDelegate] { + get { self.dataBox.withLockedValue { $0.notificationHooks } } + set { self.dataBox.withLockedValue { $0.notificationHooks = newValue } } + } + + /// Creates an empty ``QueuesConfiguration``. public init( refreshInterval: TimeAmount = .seconds(1), persistenceKey: String = "vapor_queues", workerCount: WorkerCount = .default, logger: Logger = .init(label: "codes.vapor.queues") ) { - self.jobs = [:] - self.scheduledJobs = [] self.logger = logger self.refreshInterval = refreshInterval self.persistenceKey = persistenceKey self.workerCount = workerCount - self.userInfo = [:] - self.notificationHooks = [] } - /// Adds a new `Job` to the queue configuration. - /// This must be called on all `Job` objects before they can be run in a queue. + /// Adds a new ``Job`` to the queue configuration. /// - /// - Parameter job: The `Job` to add. - mutating public func add(_ job: J) - where J: Job - { - self.logger.trace("Adding job type: \(J.name)") + /// This must be called on all ``Job`` objects before they can be run in a queue. + /// + /// - Parameter job: The ``Job`` to add. + mutating public func add(_ job: J) { + self.logger.trace("Adding job type", metadata: ["name": "\(J.name)"]) if let existing = self.jobs[J.name] { - self.logger.warning("A job is already registered with key \(J.name): \(existing)") + self.logger.warning("Job type is already registered", metadata: ["name": "\(J.name)", "existing": "\(existing)"]) } self.jobs[J.name] = job } - /// Schedules a new job for execution at a later date. /// /// config.schedule(Cleanup()) @@ -76,21 +108,20 @@ public struct QueuesConfiguration { /// .on(23) /// .at(.noon) /// - /// - Parameter job: The `ScheduledJob` to be scheduled. - mutating internal func schedule(_ job: J, builder: ScheduleBuilder = ScheduleBuilder()) -> ScheduleBuilder - where J: ScheduledJob - { - self.logger.trace("Scheduling \(job.name)") - let storage = AnyScheduledJob(job: job, scheduler: builder) - self.scheduledJobs.append(storage) + /// - Parameters: + /// - job: The ``ScheduledJob`` to schedule. + /// - builder: A ``ScheduleBuilder`` to use for scheduling. + /// - Returns: The passed-in ``ScheduleBuilder``. + mutating func schedule(_ job: some ScheduledJob, builder: ScheduleBuilder = .init()) -> ScheduleBuilder { + self.logger.trace("Scheduling job", metadata: ["job-name": "\(job.name)"]) + self.scheduledJobs.append(AnyScheduledJob(job: job, scheduler: builder)) return builder } /// Adds a notification hook that can receive status updates about jobs - /// - Parameter hook: The `NotificationHook` object - mutating public func add(_ hook: N) - where N: JobEventDelegate - { + /// + /// - Parameter hook: A ``JobEventDelegate`` to register. + mutating public func add(_ hook: some JobEventDelegate) { self.logger.trace("Adding notification hook") self.notificationHooks.append(hook) } diff --git a/Sources/Queues/QueuesDriver.swift b/Sources/Queues/QueuesDriver.swift index 02452d7..2a60ad0 100644 --- a/Sources/Queues/QueuesDriver.swift +++ b/Sources/Queues/QueuesDriver.swift @@ -1,18 +1,19 @@ /// A new driver for Queues -public protocol QueuesDriver { - /// Makes the queue worker - /// - Parameter context: The context of the job - func makeQueue(with context: QueueContext) -> Queue +public protocol QueuesDriver: Sendable { + /// Create or look up a named ``Queue`` instance. + /// + /// - Parameter context: The context for jobs on the queue. Also provides the queue name. + func makeQueue(with context: QueueContext) -> any Queue /// Shuts down the driver func shutdown() - /// Shut down the driver asyncrhonously. Helps avoid calling `.wait()` + /// Shut down the driver asynchronously. Helps avoid calling `.wait()` func asyncShutdown() async } extension QueuesDriver { public func asyncShutdown() async { - shutdown() + self.shutdown() } } diff --git a/Sources/Queues/QueuesEventLoopPreference.swift b/Sources/Queues/QueuesEventLoopPreference.swift index aa9e7fe..55307e6 100644 --- a/Sources/Queues/QueuesEventLoopPreference.swift +++ b/Sources/Queues/QueuesEventLoopPreference.swift @@ -1,11 +1,3 @@ -// -// JobsEventLoopPreference.swift -// -// -// Created by Jimmy McDermott on 10/24/19. -// - -import Foundation import NIOCore /// Determines which event loop the jobs worker uses while executing jobs. @@ -17,13 +9,13 @@ public enum QueuesEventLoopPreference { /// called back (delegated to) on the supplied EventLoop. /// If possible, the connection should also be on this EventLoop for /// improved performance. - case delegate(on: EventLoop) + case delegate(on: any EventLoop) /// Returns the delegate EventLoop given an EventLoopGroup. - public func delegate(for eventLoopGroup: EventLoopGroup) -> EventLoop { + public func delegate(for eventLoopGroup: any EventLoopGroup) -> any EventLoop { switch self { case .indifferent: - return eventLoopGroup.next() + return eventLoopGroup.any() case .delegate(let eventLoop): return eventLoop } diff --git a/Sources/Queues/RepeatedTask+Cancel.swift b/Sources/Queues/RepeatedTask+Cancel.swift index 841fa0c..db91990 100644 --- a/Sources/Queues/RepeatedTask+Cancel.swift +++ b/Sources/Queues/RepeatedTask+Cancel.swift @@ -1,23 +1,24 @@ import NIOCore +import Logging extension RepeatedTask { - func syncCancel(on eventLoop: EventLoop) { + func syncCancel(on eventLoop: any EventLoop) { do { let promise = eventLoop.makePromise(of: Void.self) self.cancel(promise: promise) try promise.futureResult.wait() } catch { - print("failed cancelling repeated task \(error)") + Logger(label: "codes.vapor.queues.repeatedtask").debug("Failed cancelling repeated task", metadata: ["error": "\(error)"]) } } - func asyncCancel(on eventLoop: EventLoop) async { + func asyncCancel(on eventLoop: any EventLoop) async { do { let promise = eventLoop.makePromise(of: Void.self) self.cancel(promise: promise) try await promise.futureResult.get() } catch { - print("failed cancelling repeated task \(error)") + Logger(label: "codes.vapor.queues.repeatedtask").debug("Failed cancelling repeated task", metadata: ["error": "\(error)"]) } } } diff --git a/Sources/Queues/Request+Queues.swift b/Sources/Queues/Request+Queues.swift index 6a1e6f4..1327cc7 100644 --- a/Sources/Queues/Request+Queues.swift +++ b/Sources/Queues/Request+Queues.swift @@ -3,18 +3,15 @@ import Vapor import NIOCore extension Request { - /// Returns the default job `Queue` - public var queue: Queue { + /// Get the default ``Queue``. + public var queue: any Queue { self.queues(.default) } - /// Returns the specific job `Queue` for the given queue name + /// Create or look up an instance of a named ``Queue`` and bind it to this request's event loop. + /// /// - Parameter queue: The queue name - public func queues(_ queue: QueueName) -> Queue { - self.application.queues.queue( - queue, - logger: self.logger, - on: self.eventLoop - ) + public func queues(_ queue: QueueName, logger: Logger? = nil) -> any Queue { + self.application.queues.queue(queue, logger: logger ?? self.logger, on: self.eventLoop) } } diff --git a/Sources/Queues/ScheduleBuilder.swift b/Sources/Queues/ScheduleBuilder.swift index 9a6a2b4..e42f58a 100644 --- a/Sources/Queues/ScheduleBuilder.swift +++ b/Sources/Queues/ScheduleBuilder.swift @@ -1,22 +1,21 @@ import Foundation /// An object that can be used to build a scheduled job -public final class ScheduleBuilder { - +public final class ScheduleBuilder: @unchecked Sendable { /// Months of the year public enum Month: Int { case january = 1 - case february = 2 - case march = 3 - case april = 4 - case may = 5 - case june = 6 - case july = 7 - case august = 8 - case september = 9 - case october = 10 - case november = 11 - case december = 12 + case february + case march + case april + case may + case june + case july + case august + case september + case october + case november + case december } /// Describes a day @@ -25,91 +24,50 @@ public final class ScheduleBuilder { case last case exact(Int) - public init(integerLiteral value: Int) { - self = .exact(value) - } + public init(integerLiteral value: Int) { self = .exact(value) } } /// Describes a day of the week public enum Weekday: Int { case sunday = 1 - case monday = 2 - case tuesday = 3 - case wednesday = 4 - case thursday = 5 - case friday = 6 - case saturday = 7 + case monday + case tuesday + case wednesday + case thursday + case friday + case saturday } /// Describes a time of day public struct Time: ExpressibleByStringLiteral, CustomStringConvertible { - var hour: Hour24 - var minute: Minute - /// Returns a `Time` object at midnight (12:00 AM) - public static var midnight: Time { - return .init(12, 00, .am) - } + public static var midnight: Time { .init(12, 00, .am) } /// Returns a `Time` object at noon (12:00 PM) - public static var noon: Time { - return .init(12, 00, .pm) - } - - /// The readable description of the time - public var description: String { - return "\(self.hour):\(self.minute)" - } + public static var noon: Time { .init(12, 00, .pm) } - init(_ hour: Hour24, _ minute: Minute) { - self.hour = hour - self.minute = minute - } + var hour: Hour24, minute: Minute + init(_ hour: Hour24, _ minute: Minute) { (self.hour, self.minute) = (hour, minute) } + init(_ hour: Hour12, _ minute: Minute, _ period: HourPeriod) { - switch period { - case .am: - if hour.number == 12 && minute.number == 0 { - self.hour = .init(0) - } else { - self.hour = .init(hour.number) - } - case .pm: - if hour.number == 12 { - self.hour = .init(12) - } else { - self.hour = .init(hour.number + 12) - } - } - self.minute = minute + self.init(.init(hour.n % 12 + (period == .am ? 0 : 12)), minute) } - - /// Takes a stringLiteral and returns a `TimeObject`. Must be in the format `00:00am/pm` + public init(stringLiteral value: String) { - let parts = value.split(separator: ":") + let parts = value.split(separator: ":", maxSplits: 1) + + guard let hour = Int(parts[0]) else { fatalError("Could not convert hour to Int") } switch parts.count { case 1: - guard let hour = Int(parts[0]) else { - fatalError("Could not convert hour to Int") - } self.init(Hour24(hour), 0) case 2: - guard let hour = Int(parts[0]) else { - fatalError("Could not convert hour to Int") - } + guard let minute = Int(parts[1].prefix(2)) else { fatalError("Could not convert minute to Int") } switch parts[1].count { case 2: - guard let minute = Int(parts[1]) else { - fatalError("Could not convert minute to Int") - } self.init(Hour24(hour), Minute(minute)) case 4: - let s = parts[1] - guard let minute = Int(s[s.startIndex.. 0, "12-hour clock cannot preceed 1") - precondition(number <= 12, "12-hour clock cannot exceed 12") - self.number = number - } - - /// Takes an integerLiteral and creates a `Hour12`. Must be `> 0 && <= 12` - public init(integerLiteral value: Int) { - self.init(value) - } + let n: Int + + init(_ n: Int) { precondition((1 ... 12).contains(n), "12-hour clock must be in range 1-12"); self.n = n } + + public init(integerLiteral value: Int) { self.init(value) } + + public var description: String { "\(self.n)" } } /// Represents an hour numeral that must be in 24 hour format public struct Hour24: ExpressibleByIntegerLiteral, CustomStringConvertible { - let number: Int - - /// The readable description of the hour, zero padding included - public var description: String { - switch self.number { - case 0..<10: - return "0" + self.number.description - default: - return self.number.description - } - } - - init(_ number: Int) { - precondition(number >= 0, "24-hour clock cannot preceed 0") - precondition(number < 24, "24-hour clock cannot exceed 24") - self.number = number - } - - /// Takes an integerLiteral and creates a `Hour24`. Must be `>= 0 && < 24` - public init(integerLiteral value: Int) { - self.init(value) - } + let n: Int + + init(_ n: Int) { precondition((0 ..< 24).contains(n), "24-hour clock must be in range 0-23"); self.n = n } + + public init(integerLiteral value: Int) { self.init(value) } + + public var description: String { String("0\(self.n)".suffix(2)) } } /// A period of hours - either `am` or `pm` - public enum HourPeriod: ExpressibleByStringLiteral, CustomStringConvertible { - case am - case pm - - /// The readable string - public var description: String { - switch self { - case .am: - return "am" - case .pm: - return "pm" - } - } - - init(_ string: String) { - switch string.lowercased() { - case "am": - self = .am - case "pm": - self = .pm - default: - fatalError("Unknown hour period: \(string), must be am or pm") - } - } - - /// Takes a stringLiteral and creates a `HourPeriod.` Must be `am` or `pm` - public init(stringLiteral value: String) { - self.init(value) - } + public enum HourPeriod: String, ExpressibleByStringLiteral, CustomStringConvertible, Hashable { + case am, pm + + init(_ string: String) { self.init(rawValue: string)! } + + public init(stringLiteral value: String) { self.init(value) } + + public var description: String { self.rawValue } } /// Describes a minute numeral public struct Minute: ExpressibleByIntegerLiteral, CustomStringConvertible { - let number: Int - - /// The readable minute, zero padded. - public var description: String { - switch self.number { - case 0..<10: - return "0" + self.number.description - default: - return self.number.description - } - } - - init(_ number: Int) { - assert(number >= 0, "Minute cannot preceed 0") - assert(number < 60, "Minute cannot exceed 60") - self.number = number - } - - /// Takes an integerLiteral and creates a `Minute`. Must be `>= 0 && < 60` - public init(integerLiteral value: Int) { - self.init(value) - } + let n: Int + + init(_ n: Int) { precondition((0 ..< 60).contains(n), "Minute must be in range 0-59"); self.n = n } + + public init(integerLiteral value: Int) { self.init(value) } + + public var description: String { String("0\(self.n)".suffix(2)) } } /// Describes a second numeral public struct Second: ExpressibleByIntegerLiteral, CustomStringConvertible { - let number: Int + let n: Int - /// The readable second, zero padded. - public var description: String { - switch self.number { - case 0..<10: - return "0" + self.number.description - default: - return self.number.description - } - } + init(_ n: Int) { precondition((0 ..< 60).contains(n), "Second must be in range 0-59"); self.n = n } - init(_ number: Int) { - assert(number >= 0, "Second cannot preceed 0") - assert(number < 60, "Second cannot exceed 60") - self.number = number - } + public init(integerLiteral value: Int) { self.init(value) } - /// Takes an integerLiteral and creates a `Second`. Must be `>= 0 && < 60` - public init(integerLiteral value: Int) { - self.init(value) - } + public var description: String { String("0\(self.n)".suffix(2)) } } - // MARK: Builders - /// An object to build a `Yearly` scheduled job public struct Yearly { let builder: ScheduleBuilder - /// The month to run the job in - /// - Parameter month: A `Month` to run the job in - public func `in`(_ month: Month) -> Monthly { - self.builder.month = month - return self.builder.monthly() - } + public func `in`(_ month: Month) -> Monthly { self.builder.month = month; return self.builder.monthly() } } /// An object to build a `Monthly` scheduled job public struct Monthly { let builder: ScheduleBuilder - /// The day to run the job on - /// - Parameter day: A `Day` to run the job on - public func on(_ day: Day) -> Daily { - self.builder.day = day - return self.builder.daily() - } + public func on(_ day: Day) -> Daily { self.builder.day = day; return self.builder.daily() } } /// An object to build a `Weekly` scheduled job public struct Weekly { let builder: ScheduleBuilder - /// The day of week to run the job on - /// - Parameter dayOfWeek: A `DayOfWeek` to run the job on - public func on(_ weekday: Weekday) -> Daily { - self.builder.weekday = weekday - return self.builder.daily() - } + public func on(_ weekday: Weekday) -> Daily { self.builder.weekday = weekday; return self.builder.daily() } } /// An object to build a `Daily` scheduled job public struct Daily { let builder: ScheduleBuilder - /// The time to run the job at - /// - Parameter time: A `Time` object to run the job on - public func at(_ time: Time) { - self.builder.time = time - } + public func at(_ time: Time) { self.builder.time = time } - /// The 24 hour time to run the job at - /// - Parameter hour: A `Hour24` to run the job at - /// - Parameter minute: A `Minute` to run the job at - public func at(_ hour: Hour24, _ minute: Minute) { - self.at(.init(hour, minute)) - } + public func at(_ hour: Hour24, _ minute: Minute) { self.at(.init(hour, minute)) } - /// The 12 hour time to run the job at - /// - Parameter hour: A `Hour12` to run the job at - /// - Parameter minute: A `Minute` to run the job at - /// - Parameter period: A `HourPeriod` to run the job at (`am` or `pm`) - public func at(_ hour: Hour12, _ minute: Minute, _ period: HourPeriod) { - self.at(.init(hour, minute, period)) - } + public func at(_ hour: Hour12, _ minute: Minute, _ period: HourPeriod) { self.at(.init(hour, minute, period)) } } /// An object to build a `Hourly` scheduled job public struct Hourly { let builder: ScheduleBuilder - /// The minute to run the job at - /// - Parameter minute: A `Minute` to run the job at - public func at(_ minute: Minute) { - self.builder.minute = minute - } + public func at(_ minute: Minute) { self.builder.minute = minute } } /// An object to build a `EveryMinute` scheduled job public struct Minutely { let builder: ScheduleBuilder - - /// The second to run the job at - /// - Parameter second: A `Second` to run the job at - public func at(_ second: Second) { - self.builder.second = second - } + + public func at(_ second: Second) { self.builder.second = second } } - /// Retrieves the next date - /// - Parameter current: The current date - /// - Returns: The next date + /// Retrieves the next date after the one given. public func nextDate(current: Date = .init()) -> Date? { - if let date = self.date, date > current { - return date - } - + if let date = self.date, date > current { return date } + var components = DateComponents() - if let milliseconds = millisecond { - components.nanosecond = milliseconds - } - if let second = self.second { - components.second = second.number - } - if let minute = self.minute { - components.minute = minute.number - } - if let time = self.time { - components.minute = time.minute.number - components.hour = time.hour.number - } - if let weekday = self.weekday { - components.weekday = weekday.rawValue - } - if let day = self.day { - switch day { - case .first: - components.day = 1 - case .last: - fatalError("Last day of the month is not yet supported.") - case .exact(let exact): - components.day = exact - } - } - if let month = self.month { - components.month = month.rawValue - } - return calendar.nextDate( - after: current, - matching: components, - matchingPolicy: .strict - ) + components.nanosecond = self.millisecond.map { $0 * 1_000_000 } + components.second = self.second?.n + components.minute = self.time?.minute.n ?? self.minute?.n + components.hour = self.time?.hour.n + components.weekday = self.weekday?.rawValue + switch self.day { + case .first?: components.day = 1 + case .exact(let exact)?: components.day = exact + case .last?: fatalError("Last day of the month is not yet supported.") + default: break + } + components.month = self.month?.rawValue + return calendar.nextDate(after: current, matching: components, matchingPolicy: .strict) } - /// The caledar used to compute the next date + /// The calendar used to compute the next date var calendar: Calendar /// Date to perform task (one-off job) var date: Date? - var month: Month? - var day: Day? - var weekday: Weekday? - var time: Time? - var minute: Minute? - var second: Second? - var millisecond: Int? - - public init(calendar: Calendar = .current) { - self.calendar = calendar - } + var month: Month?, day: Day?, weekday: Weekday? + var time: Time?, minute: Minute?, second: Second?, millisecond: Int? + + public init(calendar: Calendar = .current) { self.calendar = calendar } - // MARK: Helpers - /// Schedules a job using a specific `Calendar` - public func using(_ calendar: Calendar) -> ScheduleBuilder { - self.calendar = calendar - return self - } - + public func using(_ calendar: Calendar) -> ScheduleBuilder { self.calendar = calendar; return self } + /// Schedules a job at a specific date - public func at(_ date: Date) { - self.date = date - } - + public func at(_ date: Date) { self.date = date } + /// Creates a yearly scheduled job for further building - public func yearly() -> Yearly { - return Yearly(builder: self) - } + @discardableResult public func yearly() -> Yearly { .init(builder: self) } /// Creates a monthly scheduled job for further building - public func monthly() -> Monthly { - return Monthly(builder: self) - } + @discardableResult public func monthly() -> Monthly { .init(builder: self) } /// Creates a weekly scheduled job for further building - public func weekly() -> Weekly { - return Weekly(builder: self) - } + @discardableResult public func weekly() -> Weekly { .init(builder: self) } /// Creates a daily scheduled job for further building - public func daily() -> Daily { - return Daily(builder: self) - } + @discardableResult public func daily() -> Daily { .init(builder: self) } /// Creates a hourly scheduled job for further building - public func hourly() -> Hourly { - return Hourly(builder: self) - } - + @discardableResult public func hourly() -> Hourly { .init(builder: self) } + /// Creates a minutely scheduled job for further building - @discardableResult - public func minutely() -> Minutely { - return Minutely(builder: self) - } - + @discardableResult public func minutely() -> Minutely { .init(builder: self) } + /// Runs a job every second - public func everySecond() { - self.millisecond = 0 - } + public func everySecond() { self.millisecond = 0 } } - diff --git a/Sources/Queues/ScheduledJob.swift b/Sources/Queues/ScheduledJob.swift index b1be2ce..6fde1fb 100644 --- a/Sources/Queues/ScheduledJob.swift +++ b/Sources/Queues/ScheduledJob.swift @@ -3,10 +3,12 @@ import Foundation import Logging /// Describes a job that can be scheduled and repeated -public protocol ScheduledJob { +public protocol ScheduledJob: Sendable { var name: String { get } - /// The method called when the job is run - /// - Parameter context: A `JobContext` that can be used + + /// The method called when the job is run. + /// + /// - Parameter context: The ``QueueContext``. func run(context: QueueContext) -> EventLoopFuture } @@ -14,37 +16,40 @@ extension ScheduledJob { public var name: String { "\(Self.self)" } } -class AnyScheduledJob { - let job: ScheduledJob +final class AnyScheduledJob: Sendable { + let job: any ScheduledJob let scheduler: ScheduleBuilder - init(job: ScheduledJob, scheduler: ScheduleBuilder) { + init(job: any ScheduledJob, scheduler: ScheduleBuilder) { self.job = job self.scheduler = scheduler } -} -extension AnyScheduledJob { - struct Task { + struct Task: Sendable { let task: RepeatedTask let done: EventLoopFuture } func schedule(context: QueueContext) -> Task? { - context.logger.trace("Beginning the scheduler process") + var logger_ = context.logger + logger_[metadataKey: "job-name"] = "\(self.job.name)" + let logger = logger_ + + logger.trace("Beginning the scheduler process") + guard let date = self.scheduler.nextDate() else { - context.logger.debug("No date scheduled for \(self.job.name)") + logger.debug("Scheduler returned no date") return nil } - context.logger.debug("Scheduling \(self.job.name) to run at \(date)") + logger.debug("Job scheduled", metadata: ["scheduled-date": "\(date)"]) + let promise = context.eventLoop.makePromise(of: Void.self) let task = context.eventLoop.scheduleRepeatedTask( - initialDelay: .microseconds(Int64(date.timeIntervalSinceNow * 1_000_000)), - delay: .seconds(0) + initialDelay: .microseconds(Int64(date.timeIntervalSinceNow * 1_000_000)), delay: .zero ) { task in // always cancel task.cancel() - context.logger.trace("Running the scheduled job \(self.job.name)") + logger.trace("Running scheduled job") self.job.run(context: context).cascade(to: promise) } return .init(task: task, done: promise.futureResult) diff --git a/Sources/XCTQueues/Docs.docc/Resources/vapor-queues-logo.svg b/Sources/XCTQueues/Docs.docc/Resources/vapor-queues-logo.svg new file mode 100644 index 0000000..f40f693 --- /dev/null +++ b/Sources/XCTQueues/Docs.docc/Resources/vapor-queues-logo.svg @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + + + + + diff --git a/Sources/XCTQueues/Docs.docc/theme-settings.json b/Sources/XCTQueues/Docs.docc/theme-settings.json new file mode 100644 index 0000000..1e0af2e --- /dev/null +++ b/Sources/XCTQueues/Docs.docc/theme-settings.json @@ -0,0 +1,21 @@ +{ + "theme": { + "aside": { "border-radius": "16px", "border-style": "double", "border-width": "3px" }, + "border-radius": "0", + "button": { "border-radius": "16px", "border-width": "1px", "border-style": "solid" }, + "code": { "border-radius": "16px", "border-width": "1px", "border-style": "solid" }, + "color": { + "queues": "#e8665a", + "documentation-intro-fill": "radial-gradient(circle at top, var(--color-queues) 30%, #000 100%)", + "documentation-intro-accent": "var(--color-queues)", + "logo-base": { "dark": "#fff", "light": "#000" }, + "logo-shape": { "dark": "#000", "light": "#fff" }, + "fill": { "dark": "#000", "light": "#fff" } + }, + "icons": { "technology": "/queues/images/vapor-queues-logo.svg" } + }, + "features": { + "quickNavigation": { "enable": true }, + "i18n": { "enable": true } + } +} diff --git a/Sources/XCTQueues/TestQueueDriver.swift b/Sources/XCTQueues/TestQueueDriver.swift index a4ed842..ace6885 100644 --- a/Sources/XCTQueues/TestQueueDriver.swift +++ b/Sources/XCTQueues/TestQueueDriver.swift @@ -10,17 +10,20 @@ extension Application.Queues.Provider { return $0.queues.use(custom: TestQueuesDriver()) } } + + public static var asyncTest: Self { + .init { + $0.queues.initializeAsyncTestStorage() + return $0.queues.use(custom: AsyncTestQueuesDriver()) + } + } } struct TestQueuesDriver: QueuesDriver { - let lock: NIOLock - - init() { - self.lock = .init() - } + init() {} - func makeQueue(with context: QueueContext) -> Queue { - TestQueue(lock: self.lock, context: context) + func makeQueue(with context: QueueContext) -> any Queue { + TestQueue(_context: .init(context)) } func shutdown() { @@ -28,50 +31,60 @@ struct TestQueuesDriver: QueuesDriver { } } +struct AsyncTestQueuesDriver: QueuesDriver { + init() {} + func makeQueue(with context: QueueContext) -> any Queue { AsyncTestQueue(_context: .init(context)) } + func shutdown() {} +} + extension Application.Queues { - public final class TestQueueStorage { - public var jobs: [JobIdentifier: JobData] = [:] - public var queue: [JobIdentifier] = [] - - /// Returns all jobs in the queue of the specific `J` type. - public func all(_ job: J.Type) -> [J.Payload] - where J: Job - { - let filteredJobIds = jobs.filter { $1.jobName == J.name }.map { $0.0 } + public final class TestQueueStorage: Sendable { + private struct Box: Sendable { + var jobs: [JobIdentifier: JobData] = [:] + var queue: [JobIdentifier] = [] + } + private let box = NIOLockedValueBox(.init()) - return queue + public var jobs: [JobIdentifier: JobData] { + get { self.box.withLockedValue { $0.jobs } } + set { self.box.withLockedValue { $0.jobs = newValue } } + } + + public var queue: [JobIdentifier] { + get { self.box.withLockedValue { $0.queue } } + set { self.box.withLockedValue { $0.queue = newValue } } + } + + /// Returns the payloads of all jobs in the queue having type `J`. + public func all(_ job: J.Type) -> [J.Payload] { + let filteredJobIds = self.jobs.filter { $1.jobName == J.name }.map { $0.0 } + + return self.queue .filter { filteredJobIds.contains($0) } .compactMap { jobs[$0] } .compactMap { try? J.parsePayload($0.payload) } } - /// Returns the first job in the queue of the specific `J` type. - public func first(_ job: J.Type) -> J.Payload? - where J: Job - { + /// Returns the payload of the first job in the queue having type `J`. + public func first(_ job: J.Type) -> J.Payload? { let filteredJobIds = jobs.filter { $1.jobName == J.name }.map { $0.0 } - guard - let queueJob = queue.first(where: { filteredJobIds.contains($0) }), - let jobData = jobs[queueJob] - else { - return nil - } + guard let queueJob = self.queue.first(where: { filteredJobIds.contains($0) }), let jobData = self.jobs[queueJob] else { + return nil + } return try? J.parsePayload(jobData.payload) } /// Checks whether a job of type `J` was dispatched to queue - public func contains(_ job: J.Type) -> Bool - where J: Job - { - return first(job) != nil + public func contains(_ job: J.Type) -> Bool { + self.first(job) != nil } } struct TestQueueKey: StorageKey, LockKey { typealias Value = TestQueueStorage } - + public var test: TestQueueStorage { self.application.storage[TestQueueKey.self]! } @@ -79,50 +92,66 @@ extension Application.Queues { func initializeTestStorage() { self.application.storage[TestQueueKey.self] = .init() } + + struct AsyncTestQueueKey: StorageKey, LockKey { + typealias Value = TestQueueStorage + } + + public var asyncTest: TestQueueStorage { + self.application.storage[AsyncTestQueueKey.self]! + } + + func initializeAsyncTestStorage() { + self.application.storage[AsyncTestQueueKey.self] = .init() + } } struct TestQueue: Queue { - let lock: NIOLock - let context: QueueContext + let _context: NIOLockedValueBox + var context: QueueContext { self._context.withLockedValue { $0 } } func get(_ id: JobIdentifier) -> EventLoopFuture { - self.lock.lock() - defer { self.lock.unlock() } - - return self.context.eventLoop.makeSucceededFuture( - self.context.application.queues.test.jobs[id]! - ) + self._context.withLockedValue { context in + context.eventLoop.makeSucceededFuture(context.application.queues.test.jobs[id]!) + } } func set(_ id: JobIdentifier, to data: JobData) -> EventLoopFuture { - self.lock.lock() - defer { self.lock.unlock() } - - self.context.application.queues.test.jobs[id] = data - return self.context.eventLoop.makeSucceededFuture(()) + self._context.withLockedValue { context in + context.application.queues.test.jobs[id] = data + return context.eventLoop.makeSucceededVoidFuture() + } } func clear(_ id: JobIdentifier) -> EventLoopFuture { - self.lock.lock() - defer { self.lock.unlock() } - - self.context.application.queues.test.jobs[id] = nil - return self.context.eventLoop.makeSucceededFuture(()) + self._context.withLockedValue { context in + context.application.queues.test.jobs[id] = nil + return context.eventLoop.makeSucceededVoidFuture() + } } func pop() -> EventLoopFuture { - self.lock.lock() - defer { self.lock.unlock() } - - let last = context.application.queues.test.queue.popLast() - return self.context.eventLoop.makeSucceededFuture(last) + self._context.withLockedValue { context in + let last = context.application.queues.test.queue.popLast() + return context.eventLoop.makeSucceededFuture(last) + } } func push(_ id: JobIdentifier) -> EventLoopFuture { - self.lock.lock() - defer { self.lock.unlock() } - - self.context.application.queues.test.queue.append(id) - return self.context.eventLoop.makeSucceededFuture(()) + self._context.withLockedValue { context in + context.application.queues.test.queue.append(id) + return context.eventLoop.makeSucceededVoidFuture() + } } } + +struct AsyncTestQueue: AsyncQueue { + let _context: NIOLockedValueBox + var context: QueueContext { self._context.withLockedValue { $0 } } + + func get(_ id: JobIdentifier) async throws -> JobData { self._context.withLockedValue { $0.application.queues.asyncTest.jobs[id]! } } + func set(_ id: JobIdentifier, to data: JobData) async throws { self._context.withLockedValue { $0.application.queues.asyncTest.jobs[id] = data } } + func clear(_ id: JobIdentifier) async throws { self._context.withLockedValue { $0.application.queues.asyncTest.jobs[id] = nil } } + func pop() async throws -> JobIdentifier? { self._context.withLockedValue { $0.application.queues.asyncTest.queue.popLast() } } + func push(_ id: JobIdentifier) async throws { self._context.withLockedValue { $0.application.queues.asyncTest.queue.append(id) } } +} diff --git a/Tests/QueuesTests/AsyncQueueTests.swift b/Tests/QueuesTests/AsyncQueueTests.swift index 7c0f42e..d77cd01 100644 --- a/Tests/QueuesTests/AsyncQueueTests.swift +++ b/Tests/QueuesTests/AsyncQueueTests.swift @@ -1,16 +1,26 @@ import Queues -import Foundation -import Vapor import XCTest import XCTVapor -import XCTQueues -@testable import Vapor -import NIOCore -import NIOConcurrencyHelpers + +func XCTAssertNoThrowAsync( + _ expression: @autoclosure () async throws -> T, + _ message: @autoclosure () -> String = "", + file: StaticString = #filePath, line: UInt = #line +) async { + do { + _ = try await expression() + } catch { + XCTAssertNoThrow(try { throw error }(), message(), file: file, line: line) + } +} final class AsyncQueueTests: XCTestCase { var app: Application! + override class func setUp() { + XCTAssert(isLoggingConfigured) + } + override func setUp() async throws { app = try await Application.make(.testing) } @@ -19,15 +29,15 @@ final class AsyncQueueTests: XCTestCase { try await app.asyncShutdown() } - func testAsyncJob() async throws { + func testAsyncJobWithSyncQueue() async throws { app.queues.use(.test) let promise = app.eventLoopGroup.any().makePromise(of: Void.self) app.queues.add(MyAsyncJob(promise: promise)) app.get("foo") { req in - req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar")) - .map { _ in "done" } + try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar")) + return "done" } try await app.testable().test(.GET, "foo") { res async in @@ -46,7 +56,37 @@ final class AsyncQueueTests: XCTestCase { XCTAssertEqual(app.queues.test.queue.count, 0) XCTAssertEqual(app.queues.test.jobs.count, 0) - try XCTAssertNoThrow(promise.futureResult.wait()) + await XCTAssertNoThrowAsync(try await promise.futureResult.get()) + } + + func testAsyncJobWithAsyncQueue() async throws { + app.queues.use(.asyncTest) + + let promise = app.eventLoopGroup.any().makePromise(of: Void.self) + app.queues.add(MyAsyncJob(promise: promise)) + + app.get("foo") { req in + try await req.queue.dispatch(MyAsyncJob.self, .init(foo: "bar")) + return "done" + } + + try await app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + } + + XCTAssertEqual(app.queues.asyncTest.queue.count, 1) + XCTAssertEqual(app.queues.asyncTest.jobs.count, 1) + let job = app.queues.asyncTest.first(MyAsyncJob.self) + XCTAssert(app.queues.asyncTest.contains(MyAsyncJob.self)) + XCTAssertNotNil(job) + XCTAssertEqual(job!.foo, "bar") + + try await app.queues.queue.worker.run().get() + XCTAssertEqual(app.queues.asyncTest.queue.count, 0) + XCTAssertEqual(app.queues.asyncTest.jobs.count, 0) + + await XCTAssertNoThrowAsync(try await promise.futureResult.get()) } } @@ -58,7 +98,6 @@ struct MyAsyncJob: AsyncJob { } func dequeue(_ context: QueueContext, _ payload: Data) async throws { - promise.succeed(()) - return + self.promise.succeed() } } diff --git a/Tests/QueuesTests/QueueTests.swift b/Tests/QueuesTests/QueueTests.swift index 562287b..cd30736 100644 --- a/Tests/QueuesTests/QueueTests.swift +++ b/Tests/QueuesTests/QueueTests.swift @@ -1,353 +1,553 @@ import Atomics import Queues -import Vapor -import Foundation import XCTest import XCTVapor import XCTQueues -import NIOCore import NIOConcurrencyHelpers -@testable import Vapor + +func XCTAssertEqualAsync( + _ expression1: @autoclosure () async throws -> T, + _ expression2: @autoclosure () async throws -> T, + _ message: @autoclosure () -> String = "", + file: StaticString = #filePath, line: UInt = #line +) async where T: Equatable { + do { + let expr1 = try await expression1(), expr2 = try await expression2() + return XCTAssertEqual(expr1, expr2, message(), file: file, line: line) + } catch { + return XCTAssertEqual(try { () -> Bool in throw error }(), false, message(), file: file, line: line) + } +} + +func XCTAssertTrueAsync( + _ predicate: @autoclosure () async throws -> Bool, + _ message: @autoclosure () -> String = "", + file: StaticString = #filePath, line: UInt = #line +) async { + do { + let result = try await predicate() + XCTAssertTrue(result, message(), file: file, line: line) + } catch { + return XCTAssertTrue(try { throw error }(), message(), file: file, line: line) + } +} + +func XCTAssertFalseAsync( + _ predicate: @autoclosure () async throws -> Bool, + _ message: @autoclosure () -> String = "", + file: StaticString = #filePath, line: UInt = #line +) async { + do { + let result = try await predicate() + XCTAssertFalse(result, message(), file: file, line: line) + } catch { + return XCTAssertFalse(try { throw error }(), message(), file: file, line: line) + } +} final class QueueTests: XCTestCase { - func testVaporIntegrationWithInProcessJob() throws { - let app = Application(.testing) - app.queues.use(.test) - defer { app.shutdown() } - - let jobSignal = app.eventLoopGroup.next().makePromise(of: String.self) - app.queues.add(Foo(promise: jobSignal)) - try app.queues.startInProcessJobs(on: .default) + var app: Application! + + override class func setUp() { + XCTAssert(isLoggingConfigured) + } + + override func setUp() async throws { + self.app = try await Application.make(.testing) + self.app.queues.use(.test) + } - app.get("bar") { req in - req.queue.dispatch(Foo.self, .init(foo: "Bar payload")) - .map { _ in "job bar dispatched" } + override func tearDown() async throws { + try await self.app.asyncShutdown() + self.app = nil + } + + func testVaporIntegrationWithInProcessJob() async throws { + let jobSignal1 = self.app.eventLoopGroup.any().makePromise(of: String.self) + self.app.queues.add(Foo1(promise: jobSignal1)) + let jobSignal2 = self.app.eventLoopGroup.any().makePromise(of: String.self) + self.app.queues.add(Foo2(promise: jobSignal2)) + try self.app.queues.startInProcessJobs(on: .default) + + self.app.get("bar1") { req in + try await req.queue.dispatch(Foo1.self, .init(foo: "Bar payload")).get() + return "job bar dispatched" + } + + self.app.get("bar2") { req in + try await req.queue.dispatch(Foo2.self, .init(foo: "Bar payload")) + return "job bar dispatched" } - try app.testable().test(.GET, "bar") { res in + try await self.app.testable().test(.GET, "bar1") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "job bar dispatched") + }.test(.GET, "bar2") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "job bar dispatched") } - try XCTAssertEqual(jobSignal.futureResult.wait(), "Bar payload") + await XCTAssertEqualAsync(try await jobSignal1.futureResult.get(), "Bar payload") + await XCTAssertEqualAsync(try await jobSignal2.futureResult.get(), "Bar payload") } - func testVaporIntegration() throws { - let app = Application(.testing) - defer { app.shutdown() } - app.queues.use(.test) - - let promise = app.eventLoopGroup.next().makePromise(of: String.self) - app.queues.add(Foo(promise: promise)) - - app.get("foo") { req in - req.queue.dispatch(Foo.self, .init(foo: "bar")) - .map { _ in "done" } + func testVaporIntegration() async throws { + let promise = self.app.eventLoopGroup.any().makePromise(of: String.self) + self.app.queues.add(Foo1(promise: promise)) + + self.app.get("foo") { req in + try await req.queue.dispatch(Foo1.self, .init(foo: "bar")) + return "done" } - try app.testable().test(.GET, "foo") { res in + try await self.app.testable().test(.GET, "foo") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "done") } - XCTAssertEqual(app.queues.test.queue.count, 1) - XCTAssertEqual(app.queues.test.jobs.count, 1) - let job = app.queues.test.first(Foo.self) - XCTAssert(app.queues.test.contains(Foo.self)) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + let job = self.app.queues.test.first(Foo1.self) + XCTAssert(self.app.queues.test.contains(Foo1.self)) XCTAssertNotNil(job) XCTAssertEqual(job!.foo, "bar") - try app.queues.queue.worker.run().wait() - XCTAssertEqual(app.queues.test.queue.count, 0) - XCTAssertEqual(app.queues.test.jobs.count, 0) + try await self.app.queues.queue.worker.run() + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) - try XCTAssertEqual(promise.futureResult.wait(), "bar") + await XCTAssertEqualAsync(try await promise.futureResult.get(), "bar") } - func testSettingCustomId() throws { - let app = Application(.testing) - defer { app.shutdown() } - app.queues.use(.test) - - let promise = app.eventLoopGroup.next().makePromise(of: String.self) - app.queues.add(Foo(promise: promise)) - - app.get("foo") { req in - req.queue.dispatch(Foo.self, .init(foo: "bar"), id: JobIdentifier(string: "my-custom-id")) - .map { _ in "done" } + func testSettingCustomId() async throws { + let promise = self.app.eventLoopGroup.any().makePromise(of: String.self) + self.app.queues.add(Foo1(promise: promise)) + + self.app.get("foo") { req in + try await req.queue.dispatch(Foo1.self, .init(foo: "bar"), id: JobIdentifier(string: "my-custom-id")) + return "done" } - try app.testable().test(.GET, "foo") { res in + try await self.app.testable().test(.GET, "foo") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "done") } - XCTAssertEqual(app.queues.test.queue.count, 1) - XCTAssertEqual(app.queues.test.jobs.count, 1) - XCTAssertTrue(app.queues.test.jobs.keys.map(\.string).contains("my-custom-id")) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + XCTAssert(self.app.queues.test.jobs.keys.map(\.string).contains("my-custom-id")) - try app.queues.queue.worker.run().wait() - XCTAssertEqual(app.queues.test.queue.count, 0) - XCTAssertEqual(app.queues.test.jobs.count, 0) + try await self.app.queues.queue.worker.run() + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) - try XCTAssertEqual(promise.futureResult.wait(), "bar") + await XCTAssertEqualAsync(try await promise.futureResult.get(), "bar") } - func testScheduleBuilderAPI() throws { - let app = Application(.testing) - defer { app.shutdown() } - + func testScheduleBuilderAPI() async throws { // yearly - app.queues.schedule(Cleanup()) - .yearly() - .in(.may) - .on(23) - .at(.noon) + self.app.queues.schedule(Cleanup()).yearly().in(.may).on(23).at(.noon) // monthly - app.queues.schedule(Cleanup()) - .monthly() - .on(15) - .at(.midnight) + self.app.queues.schedule(Cleanup()).monthly().on(15).at(.midnight) // weekly - app.queues.schedule(Cleanup()) - .weekly() - .on(.monday) - .at("3:13am") + self.app.queues.schedule(Cleanup()).weekly().on(.monday).at("3:13am") // daily - app.queues.schedule(Cleanup()) - .daily() - .at("5:23pm") + self.app.queues.schedule(Cleanup()).daily().at("5:23pm") // daily 2 - app.queues.schedule(Cleanup()) - .daily() - .at(5, 23, .pm) + self.app.queues.schedule(Cleanup()).daily().at(5, 23, .pm) // daily 3 - app.queues.schedule(Cleanup()) - .daily() - .at(17, 23) + self.app.queues.schedule(Cleanup()).daily().at(17, 23) // hourly - app.queues.schedule(Cleanup()) - .hourly() - .at(30) + self.app.queues.schedule(Cleanup()).hourly().at(30) } - func testRepeatingScheduledJob() throws { - let app = Application(.testing) - defer { app.shutdown() } + func testRepeatingScheduledJob() async throws { + let scheduledJob = TestingScheduledJob() + XCTAssertEqual(scheduledJob.count.load(ordering: .relaxed), 0) + self.app.queues.schedule(scheduledJob).everySecond() + try self.app.queues.startScheduledJobs() - XCTAssertEqual(TestingScheduledJob.count.load(ordering: .relaxed), 0) - app.queues.schedule(TestingScheduledJob()).everySecond() - try app.queues.startScheduledJobs() + let promise = self.app.eventLoopGroup.any().makePromise(of: Void.self) + self.app.eventLoopGroup.any().scheduleTask(in: .seconds(5)) { + XCTAssert(scheduledJob.count.load(ordering: .relaxed) > 4) + promise.succeed() + } + + try await promise.futureResult.get() + } + + func testAsyncRepeatingScheduledJob() async throws { + let scheduledJob = AsyncTestingScheduledJob() + XCTAssertEqual(scheduledJob.count.load(ordering: .relaxed), 0) + self.app.queues.schedule(scheduledJob).everySecond() + try self.app.queues.startScheduledJobs() - let promise = app.eventLoopGroup.next().makePromise(of: Void.self) - app.eventLoopGroup.next().scheduleTask(in: .seconds(5)) { () -> Void in - XCTAssert(TestingScheduledJob.count.load(ordering: .relaxed) > 4) - promise.succeed(()) + let promise = self.app.eventLoopGroup.any().makePromise(of: Void.self) + self.app.eventLoopGroup.any().scheduleTask(in: .seconds(5)) { + XCTAssert(scheduledJob.count.load(ordering: .relaxed) > 4) + promise.succeed() } - try promise.futureResult.wait() + try await promise.futureResult.get() } - func testFailingScheduledJob() throws { - let app = Application(.testing) - defer { app.shutdown() } + func testFailingScheduledJob() async throws { + self.app.queues.schedule(FailingScheduledJob()).everySecond() + try self.app.queues.startScheduledJobs() - app.queues.schedule(FailingScheduledJob()).everySecond() - try app.queues.startScheduledJobs() + let promise = self.app.eventLoopGroup.any().makePromise(of: Void.self) + self.app.eventLoopGroup.any().scheduleTask(in: .seconds(1)) { + promise.succeed() + } + try await promise.futureResult.get() + } + + func testAsyncFailingScheduledJob() async throws { + self.app.queues.schedule(AsyncFailingScheduledJob()).everySecond() + try self.app.queues.startScheduledJobs() - let promise = app.eventLoopGroup.next().makePromise(of: Void.self) - app.eventLoopGroup.next().scheduleTask(in: .seconds(1)) { () -> Void in - promise.succeed(()) + let promise = self.app.eventLoopGroup.any().makePromise(of: Void.self) + self.app.eventLoopGroup.any().scheduleTask(in: .seconds(1)) { + promise.succeed() } - try promise.futureResult.wait() + try await promise.futureResult.get() } - func testCustomWorkerCount() throws { + func testCustomWorkerCount() async throws { // Setup custom ELG with 4 threads let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 4) - defer { try! eventLoopGroup.syncShutdownGracefully() } + + do { + let count = self.app.eventLoopGroup.any().makePromise(of: Int.self) + self.app.queues.use(custom: WorkerCountDriver(count: count)) + // Limit worker count to less than 4 threads + self.app.queues.configuration.workerCount = 2 + + try self.app.queues.startInProcessJobs(on: .default) + await XCTAssertEqualAsync(try await count.futureResult.get(), 2) + } catch { + try? await eventLoopGroup.shutdownGracefully() + throw error + } + try await eventLoopGroup.shutdownGracefully() + } - let app = Application(.testing, .shared(eventLoopGroup)) - defer { app.shutdown() } + func testSuccessHooks() async throws { + let promise = self.app.eventLoopGroup.any().makePromise(of: String.self) + let successHook = SuccessHook() + let errorHook = ErrorHook() + let dispatchHook = DispatchHook() + let dequeuedHook = DequeuedHook() + self.app.queues.add(Foo1(promise: promise)) + self.app.queues.add(successHook) + self.app.queues.add(errorHook) + self.app.queues.add(dispatchHook) + self.app.queues.add(dequeuedHook) + + self.app.get("foo") { req in + try await req.queue.dispatch(Foo1.self, .init(foo: "bar")) + return "done" + } - let count = app.eventLoopGroup.next().makePromise(of: Int.self) - app.queues.use(custom: WorkerCountDriver(count: count)) - // Limit worker count to less than 4 threads - app.queues.configuration.workerCount = 2 + XCTAssertFalse(dispatchHook.successHit) + try await self.app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + XCTAssertTrue(dispatchHook.successHit) + } - try app.queues.startInProcessJobs(on: .default) - try XCTAssertEqual(count.futureResult.wait(), 2) + XCTAssertFalse(successHook.successHit) + XCTAssertEqual(errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + let job = self.app.queues.test.first(Foo1.self) + XCTAssert(self.app.queues.test.contains(Foo1.self)) + XCTAssertNotNil(job) + XCTAssertEqual(job!.foo, "bar") + XCTAssertFalse(dequeuedHook.successHit) + + try await self.app.queues.queue.worker.run() + XCTAssertTrue(successHook.successHit) + XCTAssertEqual(errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) + XCTAssertTrue(dequeuedHook.successHit) + + await XCTAssertEqualAsync(try await promise.futureResult.get(), "bar") } - func testSuccessHooks() throws { - let app = Application(.testing) - defer { app.shutdown() } - app.queues.use(.test) + func testAsyncSuccessHooks() async throws { + let promise = self.app.eventLoopGroup.any().makePromise(of: String.self) + let successHook = AsyncSuccessHook() + let errorHook = AsyncErrorHook() + let dispatchHook = AsyncDispatchHook() + let dequeuedHook = AsyncDequeuedHook() + self.app.queues.add(Foo1(promise: promise)) + self.app.queues.add(successHook) + self.app.queues.add(errorHook) + self.app.queues.add(dispatchHook) + self.app.queues.add(dequeuedHook) + + self.app.get("foo") { req in + try await req.queue.dispatch(Foo1.self, .init(foo: "bar")) + return "done" + } + + await XCTAssertFalseAsync(await dispatchHook.successHit) + try await self.app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + await XCTAssertTrueAsync(await dispatchHook.successHit) + } + + await XCTAssertFalseAsync(await successHook.successHit) + await XCTAssertEqualAsync(await errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + let job = self.app.queues.test.first(Foo1.self) + XCTAssert(self.app.queues.test.contains(Foo1.self)) + XCTAssertNotNil(job) + XCTAssertEqual(job!.foo, "bar") + await XCTAssertFalseAsync(await dequeuedHook.successHit) - let promise = app.eventLoopGroup.next().makePromise(of: String.self) - app.queues.add(Foo(promise: promise)) - app.queues.add(SuccessHook()) - app.queues.add(ErrorHook()) - app.queues.add(DispatchHook()) - app.queues.add(DequeuedHook()) - ErrorHook.errorCount = 0 - DequeuedHook.successHit = false + try await self.app.queues.queue.worker.run() + await XCTAssertTrueAsync(await successHook.successHit) + await XCTAssertEqualAsync(await errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) + await XCTAssertTrueAsync(await dequeuedHook.successHit) - app.get("foo") { req in - req.queue.dispatch(Foo.self, .init(foo: "bar")) - .map { _ in "done" } + await XCTAssertEqualAsync(try await promise.futureResult.get(), "bar") + } + + func testFailureHooks() async throws { + self.app.queues.use(.test) + self.app.queues.add(Bar()) + let successHook = SuccessHook() + let errorHook = ErrorHook() + self.app.queues.add(successHook) + self.app.queues.add(errorHook) + + self.app.get("foo") { req in + try await req.queue.dispatch(Bar.self, .init(foo: "bar"), maxRetryCount: 3) + return "done" } - XCTAssertEqual(DispatchHook.successHit, false) - try app.testable().test(.GET, "foo") { res in + try await self.app.testable().test(.GET, "foo") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "done") - XCTAssertEqual(DispatchHook.successHit, true) } - XCTAssertEqual(SuccessHook.successHit, false) - XCTAssertEqual(ErrorHook.errorCount, 0) - XCTAssertEqual(app.queues.test.queue.count, 1) - XCTAssertEqual(app.queues.test.jobs.count, 1) - let job = app.queues.test.first(Foo.self) - XCTAssert(app.queues.test.contains(Foo.self)) + XCTAssertFalse(successHook.successHit) + XCTAssertEqual(errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + let job = self.app.queues.test.first(Bar.self) + XCTAssert(self.app.queues.test.contains(Bar.self)) XCTAssertNotNil(job) - XCTAssertEqual(job!.foo, "bar") - XCTAssertEqual(DequeuedHook.successHit, false) - - try app.queues.queue.worker.run().wait() - XCTAssertEqual(SuccessHook.successHit, true) - XCTAssertEqual(ErrorHook.errorCount, 0) - XCTAssertEqual(app.queues.test.queue.count, 0) - XCTAssertEqual(app.queues.test.jobs.count, 0) - XCTAssertEqual(DequeuedHook.successHit, true) + + try await self.app.queues.queue.worker.run() + try await self.app.queues.queue.worker.run() + try await self.app.queues.queue.worker.run() + try await self.app.queues.queue.worker.run() + XCTAssertFalse(successHook.successHit) + XCTAssertEqual(errorHook.errorCount, 1) + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) + } + + func testAsyncFailureHooks() async throws { + self.app.queues.use(.test) + self.app.queues.add(Bar()) + let successHook = AsyncSuccessHook() + let errorHook = AsyncErrorHook() + self.app.queues.add(successHook) + self.app.queues.add(errorHook) - try XCTAssertEqual(promise.futureResult.wait(), "bar") + self.app.get("foo") { req in + try await req.queue.dispatch(Bar.self, .init(foo: "bar"), maxRetryCount: 3) + return "done" + } + + try await self.app.testable().test(.GET, "foo") { res async in + XCTAssertEqual(res.status, .ok) + XCTAssertEqual(res.body.string, "done") + } + + await XCTAssertFalseAsync(await successHook.successHit) + await XCTAssertEqualAsync(await errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + let job = self.app.queues.test.first(Bar.self) + XCTAssert(self.app.queues.test.contains(Bar.self)) + XCTAssertNotNil(job) + + try await self.app.queues.queue.worker.run() + try await self.app.queues.queue.worker.run() + try await self.app.queues.queue.worker.run() + try await self.app.queues.queue.worker.run() + await XCTAssertFalseAsync(await successHook.successHit) + await XCTAssertEqualAsync(await errorHook.errorCount, 1) + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) } - func testFailureHooks() throws { - let app = Application(.testing) - defer { app.shutdown() } - app.queues.use(.test) - app.queues.add(Bar()) - app.queues.add(SuccessHook()) - app.queues.add(ErrorHook()) - ErrorHook.errorCount = 0 + func testFailureHooksWithDelay() async throws { + self.app.queues.add(Baz()) + let successHook = SuccessHook() + let errorHook = ErrorHook() + self.app.queues.add(successHook) + self.app.queues.add(errorHook) - app.get("foo") { req in - req.queue.dispatch(Bar.self, .init(foo: "bar"), maxRetryCount: 3) - .map { _ in "done" } + self.app.get("foo") { req in + try await req.queue.dispatch(Baz.self, .init(foo: "baz"), maxRetryCount: 1) + return "done" } - try app.testable().test(.GET, "foo") { res in + try await self.app.testable().test(.GET, "foo") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "done") } - XCTAssertEqual(SuccessHook.successHit, false) - XCTAssertEqual(ErrorHook.errorCount, 0) - XCTAssertEqual(app.queues.test.queue.count, 1) - XCTAssertEqual(app.queues.test.jobs.count, 1) - let job = app.queues.test.first(Bar.self) - XCTAssert(app.queues.test.contains(Bar.self)) + XCTAssertFalse(successHook.successHit) + XCTAssertEqual(errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + var job = self.app.queues.test.first(Baz.self) + XCTAssert(self.app.queues.test.contains(Baz.self)) + XCTAssertNotNil(job) + + try await self.app.queues.queue.worker.run() + XCTAssertFalse(successHook.successHit) + XCTAssertEqual(errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + job = self.app.queues.test.first(Baz.self) + XCTAssert(self.app.queues.test.contains(Baz.self)) XCTAssertNotNil(job) - try app.queues.queue.worker.run().wait() - XCTAssertEqual(SuccessHook.successHit, false) - XCTAssertEqual(ErrorHook.errorCount, 1) - XCTAssertEqual(app.queues.test.queue.count, 0) - XCTAssertEqual(app.queues.test.jobs.count, 0) - } - - func testFailureHooksWithDelay() throws { - let app = Application(.testing) - defer { app.shutdown() } - app.queues.use(.test) - app.queues.add(Baz()) - app.queues.add(SuccessHook()) - app.queues.add(ErrorHook()) - ErrorHook.errorCount = 0 - - app.get("foo") { req in - req.queue.dispatch(Baz.self, .init(foo: "baz"), maxRetryCount: 1) - .map { _ in "done" } + sleep(1) + + try await self.app.queues.queue.worker.run() + XCTAssertFalse(successHook.successHit) + XCTAssertEqual(errorHook.errorCount, 1) + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) + } + + func testAsyncFailureHooksWithDelay() async throws { + self.app.queues.add(Baz()) + let successHook = AsyncSuccessHook() + let errorHook = AsyncErrorHook() + self.app.queues.add(successHook) + self.app.queues.add(errorHook) + + self.app.get("foo") { req in + try await req.queue.dispatch(Baz.self, .init(foo: "baz"), maxRetryCount: 1) + return "done" } - try app.testable().test(.GET, "foo") { res in + try await self.app.testable().test(.GET, "foo") { res async in XCTAssertEqual(res.status, .ok) XCTAssertEqual(res.body.string, "done") } - XCTAssertEqual(SuccessHook.successHit, false) - XCTAssertEqual(ErrorHook.errorCount, 0) - XCTAssertEqual(app.queues.test.queue.count, 1) - XCTAssertEqual(app.queues.test.jobs.count, 1) - var job = app.queues.test.first(Baz.self) - XCTAssert(app.queues.test.contains(Baz.self)) + await XCTAssertFalseAsync(await successHook.successHit) + await XCTAssertEqualAsync(await errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + var job = self.app.queues.test.first(Baz.self) + XCTAssert(self.app.queues.test.contains(Baz.self)) XCTAssertNotNil(job) - try app.queues.queue.worker.run().wait() - XCTAssertEqual(SuccessHook.successHit, false) - XCTAssertEqual(ErrorHook.errorCount, 0) - XCTAssertEqual(app.queues.test.queue.count, 1) - XCTAssertEqual(app.queues.test.jobs.count, 1) - job = app.queues.test.first(Baz.self) - XCTAssert(app.queues.test.contains(Baz.self)) + try await self.app.queues.queue.worker.run() + await XCTAssertFalseAsync(await successHook.successHit) + await XCTAssertEqualAsync(await errorHook.errorCount, 0) + XCTAssertEqual(self.app.queues.test.queue.count, 1) + XCTAssertEqual(self.app.queues.test.jobs.count, 1) + job = self.app.queues.test.first(Baz.self) + XCTAssert(self.app.queues.test.contains(Baz.self)) XCTAssertNotNil(job) sleep(1) - try app.queues.queue.worker.run().wait() - XCTAssertEqual(SuccessHook.successHit, false) - XCTAssertEqual(ErrorHook.errorCount, 1) - XCTAssertEqual(app.queues.test.queue.count, 0) - XCTAssertEqual(app.queues.test.jobs.count, 0) + try await self.app.queues.queue.worker.run() + await XCTAssertFalseAsync(await successHook.successHit) + await XCTAssertEqualAsync(await errorHook.errorCount, 1) + XCTAssertEqual(self.app.queues.test.queue.count, 0) + XCTAssertEqual(self.app.queues.test.jobs.count, 0) + } + + func testStuffThatIsntActuallyUsedAnywhere() { + XCTAssertEqual(self.app.queues.queue(.default).key, "vapor_queues[default]") + XCTAssertNotNil(QueuesEventLoopPreference.indifferent.delegate(for: self.app.eventLoopGroup)) + XCTAssertNotNil(QueuesEventLoopPreference.delegate(on: self.app.eventLoopGroup.any()).delegate(for: self.app.eventLoopGroup)) } } -class DispatchHook: JobEventDelegate { - static var successHit = false +final class DispatchHook: JobEventDelegate, @unchecked Sendable { + var successHit = false - func dispatched(job: JobEventData, eventLoop: EventLoop) -> EventLoopFuture { - Self.successHit = true - return eventLoop.future() + func dispatched(job: JobEventData, eventLoop: any EventLoop) -> EventLoopFuture { + self.successHit = true + return eventLoop.makeSucceededVoidFuture() } } -class SuccessHook: JobEventDelegate { - static var successHit = false +final class SuccessHook: JobEventDelegate, @unchecked Sendable { + var successHit = false - func success(jobId: String, eventLoop: EventLoop) -> EventLoopFuture { - Self.successHit = true - return eventLoop.future() + func success(jobId: String, eventLoop: any EventLoop) -> EventLoopFuture { + self.successHit = true + return eventLoop.makeSucceededVoidFuture() } } -class ErrorHook: JobEventDelegate { - static var errorCount = 0 +final class ErrorHook: JobEventDelegate, @unchecked Sendable { + var errorCount = 0 - func error(jobId: String, error: Error, eventLoop: EventLoop) -> EventLoopFuture { - Self.errorCount += 1 - return eventLoop.future() + func error(jobId: String, error: any Error, eventLoop: any EventLoop) -> EventLoopFuture { + self.errorCount += 1 + return eventLoop.makeSucceededVoidFuture() } } -class DequeuedHook: JobEventDelegate { - static var successHit = false +final class DequeuedHook: JobEventDelegate, @unchecked Sendable { + var successHit = false - func didDequeue(jobId: String, eventLoop: EventLoop) -> EventLoopFuture { - Self.successHit = true - return eventLoop.future() + func didDequeue(jobId: String, eventLoop: any EventLoop) -> EventLoopFuture { + self.successHit = true + return eventLoop.makeSucceededVoidFuture() } } -final class WorkerCountDriver: QueuesDriver { +actor AsyncDispatchHook: AsyncJobEventDelegate { + var successHit = false + func dispatched(job: JobEventData) async throws { self.successHit = true } +} + +actor AsyncSuccessHook: AsyncJobEventDelegate { + var successHit = false + func success(jobId: String) async throws { self.successHit = true } +} + +actor AsyncErrorHook: AsyncJobEventDelegate { + var errorCount = 0 + func error(jobId: String, error: any Error) async throws { self.errorCount += 1 } +} + +actor AsyncDequeuedHook: AsyncJobEventDelegate { + var successHit = false + func didDequeue(jobId: String) async throws { self.successHit = true } +} + +final class WorkerCountDriver: QueuesDriver, @unchecked Sendable { let count: EventLoopPromise let lock: NIOLock var recordedEventLoops: Set @@ -358,11 +558,11 @@ final class WorkerCountDriver: QueuesDriver { self.recordedEventLoops = [] } - func makeQueue(with context: QueueContext) -> Queue { + func makeQueue(with context: QueueContext) -> any Queue { WorkerCountQueue(driver: self, context: context) } - func record(eventLoop: EventLoop) { + func record(eventLoop: any EventLoop) { self.lock.lock() defer { self.lock.unlock() } let previousCount = self.recordedEventLoops.count @@ -381,53 +581,60 @@ final class WorkerCountDriver: QueuesDriver { let driver: WorkerCountDriver var context: QueueContext - func get(_ id: JobIdentifier) -> EventLoopFuture { - fatalError() - } - - func set(_ id: JobIdentifier, to data: JobData) -> EventLoopFuture { - fatalError() - } - - func clear(_ id: JobIdentifier) -> EventLoopFuture { - fatalError() - } - + func get(_ id: JobIdentifier) -> EventLoopFuture { fatalError() } + func set(_ id: JobIdentifier, to data: JobData) -> EventLoopFuture { fatalError() } + func clear(_ id: JobIdentifier) -> EventLoopFuture { fatalError() } func pop() -> EventLoopFuture { self.driver.record(eventLoop: self.context.eventLoop) return self.context.eventLoop.makeSucceededFuture(nil) } - - func push(_ id: JobIdentifier) -> EventLoopFuture { - fatalError() - } + func push(_ id: JobIdentifier) -> EventLoopFuture { fatalError() } } } -struct Failure: Error { } +struct Failure: Error {} + struct FailingScheduledJob: ScheduledJob { - func run(context: QueueContext) -> EventLoopFuture { - context.eventLoop.makeFailedFuture(Failure()) - } + func run(context: QueueContext) -> EventLoopFuture { context.eventLoop.makeFailedFuture(Failure()) } +} + +struct AsyncFailingScheduledJob: AsyncScheduledJob { + func run(context: QueueContext) async throws { throw Failure() } } struct TestingScheduledJob: ScheduledJob { - static var count = ManagedAtomic(0) + var count = ManagedAtomic(0) func run(context: QueueContext) -> EventLoopFuture { - TestingScheduledJob.count.wrappingIncrement(ordering: .relaxed) - return context.eventLoop.future() + self.count.wrappingIncrement(ordering: .relaxed) + return context.eventLoop.makeSucceededVoidFuture() } } -extension ByteBuffer { - var string: String { - return .init(decoding: self.readableBytesView, as: UTF8.self) - } +struct AsyncTestingScheduledJob: AsyncScheduledJob { + var count = ManagedAtomic(0) + func run(context: QueueContext) async throws { self.count.wrappingIncrement(ordering: .relaxed) } } +struct Foo1: Job { + let promise: EventLoopPromise + + struct Data: Codable { + var foo: String + } + + func dequeue(_ context: QueueContext, _ data: Data) -> EventLoopFuture { + self.promise.succeed(data.foo) + return context.eventLoop.makeSucceededVoidFuture() + } + + func error(_ context: QueueContext, _ error: any Error, _ data: Data) -> EventLoopFuture { + self.promise.fail(error) + return context.eventLoop.makeSucceededVoidFuture() + } +} -struct Foo: Job { +struct Foo2: Job { let promise: EventLoopPromise struct Data: Codable { @@ -436,12 +643,12 @@ struct Foo: Job { func dequeue(_ context: QueueContext, _ data: Data) -> EventLoopFuture { self.promise.succeed(data.foo) - return context.eventLoop.makeSucceededFuture(()) + return context.eventLoop.makeSucceededVoidFuture() } - func error(_ context: QueueContext, _ error: Error, _ data: Data) -> EventLoopFuture { + func error(_ context: QueueContext, _ error: any Error, _ data: Data) -> EventLoopFuture { self.promise.fail(error) - return context.eventLoop.makeSucceededFuture(()) + return context.eventLoop.makeSucceededVoidFuture() } } @@ -451,11 +658,11 @@ struct Bar: Job { } func dequeue(_ context: QueueContext, _ data: Data) -> EventLoopFuture { - return context.eventLoop.makeFailedFuture(Abort(.badRequest)) + context.eventLoop.makeFailedFuture(Abort(.badRequest)) } - func error(_ context: QueueContext, _ error: Error, _ data: Data) -> EventLoopFuture { - return context.eventLoop.makeSucceededFuture(()) + func error(_ context: QueueContext, _ error: any Error, _ data: Data) -> EventLoopFuture { + context.eventLoop.makeSucceededVoidFuture() } } @@ -465,15 +672,14 @@ struct Baz: Job { } func dequeue(_ context: QueueContext, _ data: Data) -> EventLoopFuture { - return context.eventLoop.makeFailedFuture(Abort(.badRequest)) + context.eventLoop.makeFailedFuture(Abort(.badRequest)) } - func error(_ context: QueueContext, _ error: Error, _ data: Data) -> EventLoopFuture { - return context.eventLoop.makeSucceededFuture(()) + func error(_ context: QueueContext, _ error: any Error, _ data: Data) -> EventLoopFuture { + context.eventLoop.makeSucceededVoidFuture() } func nextRetryIn(attempt: Int) -> Int { - return attempt + attempt } } - diff --git a/Tests/QueuesTests/ScheduleBuilderTests.swift b/Tests/QueuesTests/ScheduleBuilderTests.swift index acd0f2f..4eb4dd9 100644 --- a/Tests/QueuesTests/ScheduleBuilderTests.swift +++ b/Tests/QueuesTests/ScheduleBuilderTests.swift @@ -1,9 +1,12 @@ import Foundation import Queues import XCTest -import NIOCore final class ScheduleBuilderTests: XCTestCase { + override class func setUp() { + XCTAssert(isLoggingConfigured) + } + func testHourlyBuilder() throws { let builder = ScheduleBuilder() builder.hourly().at(30) @@ -137,7 +140,6 @@ final class ScheduleBuilderTests: XCTestCase { } func testCustomCalendarBuilder() throws { - let est = Calendar.calendar(timezone: "EST") let mst = Calendar.calendar(timezone: "MST") @@ -153,14 +155,13 @@ final class ScheduleBuilderTests: XCTestCase { // one hour later Date(calendar: est, hour: 21, minute: 00) ) - } } final class Cleanup: ScheduledJob { func run(context: QueueContext) -> EventLoopFuture { - return context.eventLoop.makeSucceededFuture(()) + context.eventLoop.makeSucceededVoidFuture() } } diff --git a/Tests/QueuesTests/Utilities.swift b/Tests/QueuesTests/Utilities.swift new file mode 100644 index 0000000..90cb197 --- /dev/null +++ b/Tests/QueuesTests/Utilities.swift @@ -0,0 +1,15 @@ +import Logging +import class Foundation.ProcessInfo + +func env(_ name: String) -> String? { + return ProcessInfo.processInfo.environment[name] +} + +let isLoggingConfigured: Bool = { + LoggingSystem.bootstrap { label in + var handler = StreamLogHandler.standardOutput(label: label) + handler.logLevel = env("LOG_LEVEL").flatMap { .init(rawValue: $0) } ?? .info + return handler + } + return true +}()