Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
83.33% covered (warning)
83.33%
65 / 78
70.00% covered (warning)
70.00%
7 / 10
CRAP
0.00% covered (danger)
0.00%
0 / 1
BatchQueue
83.33% covered (warning)
83.33%
65 / 78
70.00% covered (warning)
70.00%
7 / 10
20.67
0.00% covered (danger)
0.00%
0 / 1
 create_table
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
6
 __construct
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 push
100.00% covered (success)
100.00%
10 / 10
100.00% covered (success)
100.00%
1 / 1
3
 pull
89.47% covered (warning)
89.47%
17 / 19
0.00% covered (danger)
0.00%
0 / 1
7.06
 remove_events_exceeding_attempts_limit
100.00% covered (success)
100.00%
6 / 6
100.00% covered (success)
100.00%
1 / 1
1
 increment_attempt
100.00% covered (success)
100.00%
10 / 10
100.00% covered (success)
100.00%
1 / 1
1
 remove
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
1
 reserve
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
1
 release
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
1
 count
100.00% covered (success)
100.00%
7 / 7
100.00% covered (success)
100.00%
1 / 1
1
1<?php
2
3namespace NewfoldLabs\WP\Module\Data\EventQueue\Queues;
4
5use NewfoldLabs\WP\Module\Data\Event;
6use NewfoldLabs\WP\Module\Data\EventQueue\Queryable;
7use NewfoldLabs\WP\ModuleLoader\Container;
8
9/**
10 * A table for storing events to later process.
11 *
12 * id | event | attempts | reserved_at | available_at | created_at
13 */
14class BatchQueue implements BatchQueueInterface {
15
16    use Queryable;
17
18    /**
19     * Dependency injection container
20     *
21     * @used-by Queryable::query()
22     * @used-by Queryable::table()
23     *
24     * @var Container $container
25     */
26    protected $container;
27
28    /**
29     * Create the `nfd_data_event_queue` table.
30     *
31     * Uses the `dbDelta` function to create the table if it doesn't exist.
32     *
33     * Used by activation hook and upgrade handler.
34     */
35    public static function create_table(): void {
36        global $wpdb;
37
38        if ( ! function_exists( 'dbDelta' ) ) {
39            require ABSPATH . 'wp-admin/includes/upgrade.php';
40        }
41
42        $wpdb->hide_errors();
43
44        $charset_collate = $wpdb->get_charset_collate();
45
46        $sql = <<<SQL
47                CREATE TABLE {$wpdb->prefix}nfd_data_event_queue (
48                    id bigint(20) NOT NULL AUTO_INCREMENT,
49                    event longtext NOT NULL,
50                    attempts tinyint(3) NOT NULL DEFAULT 0,
51                    reserved_at datetime DEFAULT NULL,
52                    available_at datetime NOT NULL,
53                    created_at datetime NOT NULL,
54                    PRIMARY KEY (id)
55                    ) $charset_collate;
56                SQL;
57
58        dbDelta( $sql );
59    }
60
61    /**
62     * Constructor
63     *
64     * @param  Container $container Dependency injection container for query object and table name.
65     */
66    public function __construct( Container $container ) {
67        $this->container = $container;
68    }
69
70    /**
71     * Push events onto the queue
72     *
73     * @param  non-empty-array<Event> $events The events to store in the queue.
74     *
75     * @return bool
76     */
77    public function push( array $events ) {
78
79        $time = current_time( 'mysql' );
80
81        $inserts = array();
82        foreach ( $events as $event ) {
83            $inserts[] = array(
84                // phpcs:disable WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize
85                'event'        => serialize( $event ),
86                'available_at' => $time,
87                'created_at'   => $event->created_at ?? $time,
88                // Events are stored for later if they have failed once, except pageviews which are never sent synchronously.
89                'attempts'     => 'pageview' === $event->key ? 0 : 1,
90            );
91        }
92
93        return (bool) $this->bulkInsert( $this->table(), $inserts );
94    }
95
96    /**
97     * Pull events from the queue
98     *
99     * @param int $count The number of events to pull (limit).
100     *
101     * @return Event[]
102     */
103    public function pull( int $count ) {
104
105        $events = array();
106
107        $raw_events = $this
108            ->query()
109            ->select( '*' )
110            ->from( $this->table(), false )
111            ->whereNull( 'reserved_at' )
112            ->where( 'available_at', '<=', current_time( 'mysql' ) )
113            ->order_by( 'available_at' )
114            ->limit( $count )
115            ->get();
116
117        if ( ! is_array( $raw_events ) ) {
118            return $events;
119        }
120
121        foreach ( $raw_events as $raw_event ) {
122            if ( property_exists( $raw_event, 'id' ) && property_exists( $raw_event, 'event' ) ) {
123                $event_data = maybe_unserialize( $raw_event->event );
124                if ( is_array( $event_data ) && property_exists( $raw_event, 'created_at' ) ) {
125                    $event_data['created_at'] = $raw_event->created_at;
126                }
127                $events[ $raw_event->id ] = $event_data;
128            }
129        }
130
131        return $events;
132    }
133    /**
134     * Remove events from the queue that have exceeded the attempts limit
135     *
136     * @param  int $limit number of attempts
137     * @return bool
138     */
139    public function remove_events_exceeding_attempts_limit( $limit ) {
140        return (bool) $this
141            ->query()
142            ->select( '*' )
143            ->from( $this->table(), false )
144            ->where( 'attempts', '>=', $limit )
145            ->delete();
146    }
147
148    /**
149     * Increment the attempts for a given event
150     *
151     * @param  int[] $ids list of ids to increment
152     *
153     * @return bool
154     */
155    public function increment_attempt( array $ids ) {
156        global $wpdb;
157
158        $table = $this->table();
159        $ids   = array_map( 'intval', $ids );
160
161        $placeholders = implode( ',', array_fill( 0, count( $ids ), '%d' ) );
162
163        return (bool) $wpdb->query(
164            $wpdb->prepare(
165                "UPDATE {$table} SET attempts = attempts + 1 WHERE id IN ($placeholders)", // phpcs:ignore WordPress.DB.PreparedSQLPlaceholders.UnfinishedPrepare, WordPress.DB.PreparedSQL.InterpolatedNotPrepared
166                ...$ids
167            )
168        );
169    }
170
171    /**
172     * Remove events from the queue
173     *
174     * @param  int[] $ids list of ids to remove
175     *
176     * @return bool
177     */
178    public function remove( array $ids ) {
179        return (bool) $this
180            ->query()
181            ->table( $this->table(), false )
182            ->whereIn( 'id', $ids )
183            ->delete();
184    }
185
186    /**
187     * Reserve events in the queue
188     *
189     * @param  int[] $ids list of ids to reserve
190     *
191     * @return bool
192     */
193    public function reserve( array $ids ) {
194        return (bool) $this
195            ->query()
196            ->table( $this->table(), false )
197            ->whereIn( 'id', $ids )
198            ->update( array( 'reserved_at' => current_time( 'mysql' ) ) );
199    }
200
201    /**
202     * Release events back onto the queue
203     *
204     * @param  int[] $ids list of ids to release
205     *
206     * @return bool
207     */
208    public function release( array $ids ) {
209        return (bool) $this
210            ->query()
211            ->table( $this->table(), false )
212            ->whereIn( 'id', $ids )
213            ->update( array( 'reserved_at' => null ) );
214    }
215
216    /**
217     * Count the number of events in the queue
218     *
219     * @return int
220     */
221    public function count() {
222        return $this
223            ->query()
224            ->select( '*' )
225            ->from( $this->table(), false )
226            ->whereNull( 'reserved_at' )
227            ->where( 'available_at', '<=', current_time( 'mysql' ) )
228            ->count();
229    }
230}