db = Database::getInstance(); $this->lockFile = sys_get_temp_dir() . '/integrity_check.lock'; $this->initTables(); } public static function getInstance() { if (self::$instance === null) { self::$instance = new self(); } return self::$instance; } private function initTables() { // Create queue tables if they don't exist $this->db->query(" CREATE TABLE IF NOT EXISTS process_queue ( id INT AUTO_INCREMENT PRIMARY KEY, task_type VARCHAR(50) NOT NULL, data JSON, status ENUM('pending', 'processing', 'completed', 'failed') DEFAULT 'pending', attempts INT DEFAULT 0, error_message TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX (status, task_type) ) "); $this->db->query(" CREATE TABLE IF NOT EXISTS queue_status ( id INT AUTO_INCREMENT PRIMARY KEY, task_type VARCHAR(50) NOT NULL, total_items INT DEFAULT 0, processed_items INT DEFAULT 0, failed_items INT DEFAULT 0, is_running BOOLEAN DEFAULT FALSE, last_run TIMESTAMP NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY (task_type) ) "); } public function startTask($taskType, $items) { try { $this->db->query("START TRANSACTION"); // Clear existing queue for this task type $this->db->query("DELETE FROM process_queue WHERE task_type = '{$taskType}'"); // Insert new items $stmt = $this->db->prepare(" INSERT INTO process_queue (task_type, data) VALUES (?, ?) "); foreach ($items as $item) { $stmt->bind_param('ss', $taskType, json_encode($item)); $stmt->execute(); } // Update queue status $this->db->query(" INSERT INTO queue_status (task_type, total_items, is_running) VALUES ('{$taskType}', " . count($items) . ", TRUE) ON DUPLICATE KEY UPDATE total_items = VALUES(total_items), processed_items = 0, failed_items = 0, is_running = TRUE, last_run = NOW() "); $this->db->query("COMMIT"); return true; } catch (Exception $e) { $this->db->query("ROLLBACK"); throw $e; } } public function getNextItem($taskType) { try { $this->db->query("START TRANSACTION"); $sql = " SELECT id, data FROM process_queue WHERE task_type = ? AND status = 'pending' AND attempts < 3 ORDER BY id ASC LIMIT 1 FOR UPDATE "; $stmt = $this->db->prepare($sql); $stmt->bind_param('s', $taskType); $stmt->execute(); $result = $stmt->get_result(); if ($row = $result->fetch_assoc()) { // Mark as processing $this->db->query(" UPDATE process_queue SET status = 'processing', attempts = attempts + 1 WHERE id = {$row['id']} "); $this->db->query("COMMIT"); return [ 'id' => $row['id'], 'data' => json_decode($row['data'], true) ]; } $this->db->query("COMMIT"); return null; } catch (Exception $e) { $this->db->query("ROLLBACK"); throw $e; } } public function updateItemStatus($id, $status, $errorMessage = null) { $stmt = $this->db->prepare(" UPDATE process_queue SET status = ?, error_message = ? WHERE id = ? "); $stmt->bind_param('ssi', $status, $errorMessage, $id); $stmt->execute(); // Update queue status $this->updateQueueStatus(); } public function updateQueueStatus() { $this->db->query(" UPDATE queue_status qs SET processed_items = ( SELECT COUNT(*) FROM process_queue WHERE task_type = qs.task_type AND status = 'completed' ), failed_items = ( SELECT COUNT(*) FROM process_queue WHERE task_type = qs.task_type AND status = 'failed' ), is_running = EXISTS ( SELECT 1 FROM process_queue WHERE task_type = qs.task_type AND status IN ('pending', 'processing') ) "); } public function getQueueStatus($taskType) { $result = $this->db->query(" SELECT * FROM queue_status WHERE task_type = '{$taskType}' LIMIT 1 "); return $result ? $result->fetch_assoc() : null; } public function isLocked() { if (file_exists($this->lockFile)) { $lockTime = filemtime($this->lockFile); if (time() - $lockTime > 300) { // 5 minute timeout unlink($this->lockFile); return false; } return true; } return false; } public function acquireLock() { if ($this->isLocked()) { return false; } return touch($this->lockFile); } public function releaseLock() { if (file_exists($this->lockFile)) { unlink($this->lockFile); } } } ?>