Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
83.33% |
65 / 78 |
|
70.00% |
7 / 10 |
CRAP | |
0.00% |
0 / 1 |
BatchQueue | |
83.33% |
65 / 78 |
|
70.00% |
7 / 10 |
19.50 | |
0.00% |
0 / 1 |
create_table | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
6 | |||
__construct | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
push | |
100.00% |
10 / 10 |
|
100.00% |
1 / 1 |
2 | |||
pull | |
89.47% |
17 / 19 |
|
0.00% |
0 / 1 |
7.06 | |||
remove_events_exceeding_attempts_limit | |
100.00% |
6 / 6 |
|
100.00% |
1 / 1 |
1 | |||
increment_attempt | |
100.00% |
10 / 10 |
|
100.00% |
1 / 1 |
1 | |||
remove | |
100.00% |
5 / 5 |
|
100.00% |
1 / 1 |
1 | |||
reserve | |
100.00% |
5 / 5 |
|
100.00% |
1 / 1 |
1 | |||
release | |
100.00% |
5 / 5 |
|
100.00% |
1 / 1 |
1 | |||
count | |
100.00% |
7 / 7 |
|
100.00% |
1 / 1 |
1 |
1 | <?php |
2 | |
3 | namespace NewfoldLabs\WP\Module\Data\EventQueue\Queues; |
4 | |
5 | use NewfoldLabs\WP\Module\Data\Event; |
6 | use NewfoldLabs\WP\Module\Data\EventQueue\Queryable; |
7 | use NewfoldLabs\WP\ModuleLoader\Container; |
8 | |
9 | /** |
10 | * A table for storing events to later process. |
11 | * |
12 | * id | event | attempts | reserved_at | available_at | created_at |
13 | */ |
14 | class BatchQueue implements BatchQueueInterface { |
15 | |
16 | use Queryable; |
17 | |
18 | /** |
19 | * Dependency injection container |
20 | * |
21 | * @used-by Queryable::query() |
22 | * @used-by Queryable::table() |
23 | * |
24 | * @var Container $container |
25 | */ |
26 | protected $container; |
27 | |
28 | /** |
29 | * Create the `nfd_data_event_queue` table. |
30 | * |
31 | * Uses the `dbDelta` function to create the table if it doesn't exist. |
32 | * |
33 | * Used by activation hook and upgrade handler. |
34 | */ |
35 | public static function create_table(): void { |
36 | global $wpdb; |
37 | |
38 | if ( ! function_exists( 'dbDelta' ) ) { |
39 | require ABSPATH . 'wp-admin/includes/upgrade.php'; |
40 | } |
41 | |
42 | $wpdb->hide_errors(); |
43 | |
44 | $charset_collate = $wpdb->get_charset_collate(); |
45 | |
46 | $sql = <<<SQL |
47 | CREATE TABLE {$wpdb->prefix}nfd_data_event_queue ( |
48 | id bigint(20) NOT NULL AUTO_INCREMENT, |
49 | event longtext NOT NULL, |
50 | attempts tinyint(3) NOT NULL DEFAULT 0, |
51 | reserved_at datetime DEFAULT NULL, |
52 | available_at datetime NOT NULL, |
53 | created_at datetime NOT NULL, |
54 | PRIMARY KEY (id) |
55 | ) $charset_collate; |
56 | SQL; |
57 | |
58 | dbDelta( $sql ); |
59 | } |
60 | |
61 | /** |
62 | * Constructor |
63 | * |
64 | * @param Container $container Dependency injection container for query object and table name. |
65 | */ |
66 | public function __construct( Container $container ) { |
67 | $this->container = $container; |
68 | } |
69 | |
70 | /** |
71 | * Push events onto the queue |
72 | * |
73 | * @param non-empty-array<Event> $events The events to store in the queue. |
74 | * |
75 | * @return bool |
76 | */ |
77 | public function push( array $events ) { |
78 | |
79 | $time = current_time( 'mysql' ); |
80 | |
81 | $inserts = array(); |
82 | foreach ( $events as $event ) { |
83 | $inserts[] = array( |
84 | // phpcs:disable WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize |
85 | 'event' => serialize( $event ), |
86 | 'available_at' => $time, |
87 | 'created_at' => $event->created_at ?? $time, |
88 | 'attempts' => 1, |
89 | ); |
90 | } |
91 | |
92 | return (bool) $this->bulkInsert( $this->table(), $inserts ); |
93 | } |
94 | |
95 | /** |
96 | * Pull events from the queue |
97 | * |
98 | * @param int $count The number of events to pull (limit). |
99 | * |
100 | * @return Event[] |
101 | */ |
102 | public function pull( int $count ) { |
103 | |
104 | $events = array(); |
105 | |
106 | $raw_events = $this |
107 | ->query() |
108 | ->select( '*' ) |
109 | ->from( $this->table(), false ) |
110 | ->whereNull( 'reserved_at' ) |
111 | ->where( 'available_at', '<=', current_time( 'mysql' ) ) |
112 | ->order_by( 'available_at' ) |
113 | ->limit( $count ) |
114 | ->get(); |
115 | |
116 | if ( ! is_array( $raw_events ) ) { |
117 | return $events; |
118 | } |
119 | |
120 | foreach ( $raw_events as $raw_event ) { |
121 | if ( property_exists( $raw_event, 'id' ) && property_exists( $raw_event, 'event' ) ) { |
122 | $event_data = maybe_unserialize( $raw_event->event ); |
123 | if ( is_array( $event_data ) && property_exists( $raw_event, 'created_at' ) ) { |
124 | $event_data['created_at'] = $raw_event->created_at; |
125 | } |
126 | $events[ $raw_event->id ] = $event_data; |
127 | } |
128 | } |
129 | |
130 | return $events; |
131 | } |
132 | /** |
133 | * Remove events from the queue that have exceeded the attempts limit |
134 | * |
135 | * @param int $limit number of attempts |
136 | * @return bool |
137 | */ |
138 | public function remove_events_exceeding_attempts_limit( $limit ) { |
139 | return (bool) $this |
140 | ->query() |
141 | ->select( '*' ) |
142 | ->from( $this->table(), false ) |
143 | ->where( 'attempts', '>=', $limit ) |
144 | ->delete(); |
145 | } |
146 | |
147 | /** |
148 | * Increment the attempts for a given event |
149 | * |
150 | * @param int[] $ids list of ids to increment |
151 | * |
152 | * @return bool |
153 | */ |
154 | public function increment_attempt( array $ids ) { |
155 | global $wpdb; |
156 | |
157 | $table = $this->table(); |
158 | $ids = array_map( 'intval', $ids ); |
159 | |
160 | $placeholders = implode( ',', array_fill( 0, count( $ids ), '%d' ) ); |
161 | |
162 | return (bool) $wpdb->query( |
163 | $wpdb->prepare( |
164 | "UPDATE {$table} SET attempts = attempts + 1 WHERE id IN ($placeholders)", // phpcs:ignore WordPress.DB.PreparedSQLPlaceholders.UnfinishedPrepare, WordPress.DB.PreparedSQL.InterpolatedNotPrepared |
165 | ...$ids |
166 | ) |
167 | ); |
168 | } |
169 | |
170 | /** |
171 | * Remove events from the queue |
172 | * |
173 | * @param int[] $ids list of ids to remove |
174 | * |
175 | * @return bool |
176 | */ |
177 | public function remove( array $ids ) { |
178 | return (bool) $this |
179 | ->query() |
180 | ->table( $this->table(), false ) |
181 | ->whereIn( 'id', $ids ) |
182 | ->delete(); |
183 | } |
184 | |
185 | /** |
186 | * Reserve events in the queue |
187 | * |
188 | * @param int[] $ids list of ids to reserve |
189 | * |
190 | * @return bool |
191 | */ |
192 | public function reserve( array $ids ) { |
193 | return (bool) $this |
194 | ->query() |
195 | ->table( $this->table(), false ) |
196 | ->whereIn( 'id', $ids ) |
197 | ->update( array( 'reserved_at' => current_time( 'mysql' ) ) ); |
198 | } |
199 | |
200 | /** |
201 | * Release events back onto the queue |
202 | * |
203 | * @param int[] $ids list of ids to release |
204 | * |
205 | * @return bool |
206 | */ |
207 | public function release( array $ids ) { |
208 | return (bool) $this |
209 | ->query() |
210 | ->table( $this->table(), false ) |
211 | ->whereIn( 'id', $ids ) |
212 | ->update( array( 'reserved_at' => null ) ); |
213 | } |
214 | |
215 | /** |
216 | * Count the number of events in the queue |
217 | * |
218 | * @return int |
219 | */ |
220 | public function count() { |
221 | return $this |
222 | ->query() |
223 | ->select( '*' ) |
224 | ->from( $this->table(), false ) |
225 | ->whereNull( 'reserved_at' ) |
226 | ->where( 'available_at', '<=', current_time( 'mysql' ) ) |
227 | ->count(); |
228 | } |
229 | } |