Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
83.33% covered (warning)
83.33%
65 / 78
70.00% covered (warning)
70.00%
7 / 10
CRAP
0.00% covered (danger)
0.00%
0 / 1
BatchQueue
83.33% covered (warning)
83.33%
65 / 78
70.00% covered (warning)
70.00%
7 / 10
19.50
0.00% covered (danger)
0.00%
0 / 1
 create_table
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
6
 __construct
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 push
100.00% covered (success)
100.00%
10 / 10
100.00% covered (success)
100.00%
1 / 1
2
 pull
89.47% covered (warning)
89.47%
17 / 19
0.00% covered (danger)
0.00%
0 / 1
7.06
 remove_events_exceeding_attempts_limit
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 increment_attempt
100.00% covered (success)
100.00%
10 / 10
100.00% covered (success)
100.00%
1 / 1
1
 remove
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
1
 reserve
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
1
 release
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
1
 count
100.00% covered (success)
100.00%
7 / 7
100.00% covered (success)
100.00%
1 / 1
1
1<?php
2
3namespace NewfoldLabs\WP\Module\Data\EventQueue\Queues;
4
5use NewfoldLabs\WP\Module\Data\Event;
6use NewfoldLabs\WP\Module\Data\EventQueue\Queryable;
7use NewfoldLabs\WP\ModuleLoader\Container;
8
9/**
10 * A table for storing events to later process.
11 *
12 * id | event | attempts | reserved_at | available_at | created_at
13 */
14class BatchQueue implements BatchQueueInterface {
15
16    use Queryable;
17
18    /**
19     * Dependency injection container
20     *
21     * @used-by Queryable::query()
22     * @used-by Queryable::table()
23     *
24     * @var Container $container
25     */
26    protected $container;
27
28    /**
29     * Create the `nfd_data_event_queue` table.
30     *
31     * Uses the `dbDelta` function to create the table if it doesn't exist.
32     *
33     * Used by activation hook and upgrade handler.
34     */
35    public static function create_table(): void {
36        global $wpdb;
37
38        if ( ! function_exists( 'dbDelta' ) ) {
39            require ABSPATH . 'wp-admin/includes/upgrade.php';
40        }
41
42        $wpdb->hide_errors();
43
44        $charset_collate = $wpdb->get_charset_collate();
45
46        $sql = <<<SQL
47                CREATE TABLE {$wpdb->prefix}nfd_data_event_queue (
48                    id bigint(20) NOT NULL AUTO_INCREMENT,
49                    event longtext NOT NULL,
50                    attempts tinyint(3) NOT NULL DEFAULT 0,
51                    reserved_at datetime DEFAULT NULL,
52                    available_at datetime NOT NULL,
53                    created_at datetime NOT NULL,
54                    PRIMARY KEY (id)
55                    ) $charset_collate;
56                SQL;
57
58        dbDelta( $sql );
59    }
60
61    /**
62     * Constructor
63     *
64     * @param  Container $container Dependency injection container for query object and table name.
65     */
66    public function __construct( Container $container ) {
67        $this->container = $container;
68    }
69
70    /**
71     * Push events onto the queue
72     *
73     * @param  non-empty-array<Event> $events The events to store in the queue.
74     *
75     * @return bool
76     */
77    public function push( array $events ) {
78
79        $time = current_time( 'mysql' );
80
81        $inserts = array();
82        foreach ( $events as $event ) {
83            $inserts[] = array(
84                // phpcs:disable WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize
85                'event'        => serialize( $event ),
86                'available_at' => $time,
87                'created_at'   => $event->created_at ?? $time,
88                'attempts'     => 1,
89            );
90        }
91
92        return (bool) $this->bulkInsert( $this->table(), $inserts );
93    }
94
95    /**
96     * Pull events from the queue
97     *
98     * @param int $count The number of events to pull (limit).
99     *
100     * @return Event[]
101     */
102    public function pull( int $count ) {
103
104        $events = array();
105
106        $raw_events = $this
107            ->query()
108            ->select( '*' )
109            ->from( $this->table(), false )
110            ->whereNull( 'reserved_at' )
111            ->where( 'available_at', '<=', current_time( 'mysql' ) )
112            ->order_by( 'available_at' )
113            ->limit( $count )
114            ->get();
115
116        if ( ! is_array( $raw_events ) ) {
117            return $events;
118        }
119
120        foreach ( $raw_events as $raw_event ) {
121            if ( property_exists( $raw_event, 'id' ) && property_exists( $raw_event, 'event' ) ) {
122                $event_data = maybe_unserialize( $raw_event->event );
123                if ( is_array( $event_data ) && property_exists( $raw_event, 'created_at' ) ) {
124                    $event_data['created_at'] = $raw_event->created_at;
125                }
126                $events[ $raw_event->id ] = $event_data;
127            }
128        }
129
130        return $events;
131    }
132    /**
133     * Remove events from the queue that have exceeded the attempts limit
134     *
135     * @param  int $limit number of attempts
136     * @return bool
137     */
138    public function remove_events_exceeding_attempts_limit( $limit ) {
139        return (bool) $this
140            ->query()
141            ->select( '*' )
142            ->from( $this->table(), false )
143            ->where( 'attempts', '>=', $limit )
144            ->delete();
145    }
146
147    /**
148     * Increment the attempts for a given event
149     *
150     * @param  int[] $ids list of ids to increment
151     *
152     * @return bool
153     */
154    public function increment_attempt( array $ids ) {
155        global $wpdb;
156
157        $table = $this->table();
158        $ids   = array_map( 'intval', $ids );
159
160        $placeholders = implode( ',', array_fill( 0, count( $ids ), '%d' ) );
161
162        return (bool) $wpdb->query(
163            $wpdb->prepare(
164                "UPDATE {$table} SET attempts = attempts + 1 WHERE id IN ($placeholders)", // phpcs:ignore WordPress.DB.PreparedSQLPlaceholders.UnfinishedPrepare, WordPress.DB.PreparedSQL.InterpolatedNotPrepared
165                ...$ids
166            )
167        );
168    }
169
170    /**
171     * Remove events from the queue
172     *
173     * @param  int[] $ids list of ids to remove
174     *
175     * @return bool
176     */
177    public function remove( array $ids ) {
178        return (bool) $this
179            ->query()
180            ->table( $this->table(), false )
181            ->whereIn( 'id', $ids )
182            ->delete();
183    }
184
185    /**
186     * Reserve events in the queue
187     *
188     * @param  int[] $ids list of ids to reserve
189     *
190     * @return bool
191     */
192    public function reserve( array $ids ) {
193        return (bool) $this
194            ->query()
195            ->table( $this->table(), false )
196            ->whereIn( 'id', $ids )
197            ->update( array( 'reserved_at' => current_time( 'mysql' ) ) );
198    }
199
200    /**
201     * Release events back onto the queue
202     *
203     * @param  int[] $ids list of ids to release
204     *
205     * @return bool
206     */
207    public function release( array $ids ) {
208        return (bool) $this
209            ->query()
210            ->table( $this->table(), false )
211            ->whereIn( 'id', $ids )
212            ->update( array( 'reserved_at' => null ) );
213    }
214
215    /**
216     * Count the number of events in the queue
217     *
218     * @return int
219     */
220    public function count() {
221        return $this
222            ->query()
223            ->select( '*' )
224            ->from( $this->table(), false )
225            ->whereNull( 'reserved_at' )
226            ->where( 'available_at', '<=', current_time( 'mysql' ) )
227            ->count();
228    }
229}