realtimeService = new DashboardRealtimeService(); $this->authService = new AuthService(); } /** * GET /api/dashboard/stream * * SSE stream: every 2s check for new attendance, send event; timeout after 60s. * Uses after_id for incremental feed. User context from session applied for role filtering. * * @return ResponseInterface */ public function index(): ResponseInterface { ignore_user_abort(true); set_time_limit(0); while (ob_get_level() > 0) { ob_end_flush(); } $this->response->setHeader('Content-Type', 'text/event-stream'); $this->response->setHeader('Cache-Control', 'no-cache'); $this->response->setHeader('Connection', 'keep-alive'); $this->response->setHeader('X-Accel-Buffering', 'no'); // nginx $this->response->setBody(''); if (function_exists('apache_setenv')) { @apache_setenv('no-gzip', '1'); } @ini_set('output_buffering', 'off'); @ini_set('zlib.output_compression', false); $this->response->sendHeaders(); $currentUser = $this->authService->currentUser(); $startTime = time(); $lastId = (int) ($this->request->getGet('after_id') ?? 0); while (true) { if (time() - $startTime >= $this->timeoutSeconds) { $this->sendEvent('timeout', ['message' => 'Stream ended after ' . $this->timeoutSeconds . 's']); break; } $rows = $this->realtimeService->getAttendanceSinceId($lastId, 50, $currentUser); if (empty($rows)) { $this->sendEvent('heartbeat', ['ts' => gmdate('Y-m-d\TH:i:s\Z')]); } else { foreach ($rows as $row) { $this->sendEvent('attendance', $row); $lastId = max($lastId, $row['id']); } } flush(); sleep($this->intervalSeconds); } return $this->response; } /** * Send one SSE event (event name + data line) * * @param string $event Event name * @param array $data Data to send as JSON */ protected function sendEvent(string $event, array $data): void { echo 'event: ' . $event . "\n"; echo 'data: ' . json_encode($data, JSON_UNESCAPED_UNICODE) . "\n\n"; flush(); } }