From a50b4ae72427a6511e51f7b220dece64805301c2 Mon Sep 17 00:00:00 2001 From: Joerg Date: Tue, 20 Jan 2026 11:46:19 +0100 Subject: [PATCH] feat: add sync job cancel and rollback with real-time updates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement comprehensive sync job management with rollback capabilities and real-time status updates on the dashboard. ## Features ### Cancel & Rollback - Users can cancel failed or stale (>1h) sync jobs - Rollback deletes added QSOs and restores updated QSOs to previous state - Uses qso_changes table to track all modifications with before/after snapshots - Server-side validation prevents cancelling completed or active jobs ### Database Changes - Add qso_changes table to track QSO modifications per job - Stores change type (added/updated), before/after data snapshots - Enables precise rollback of sync operations - Migration script included ### Real-time Updates - Dashboard now polls for job updates every 2 seconds - Smart polling: starts when jobs active, stops when complete - Job status badges update in real-time (pending → running → completed) - Cancel button appears/disappears based on job state ### Backend - Fixed job ordering to show newest first (desc createdAt) - Track all QSO changes during LoTW/DCL sync operations - cancelJob() function handles rollback logic - DELETE /api/jobs/:jobId endpoint for cancelling jobs ### Frontend - jobsAPI.cancel() method for cancelling jobs - Dashboard shows last 5 sync jobs with status, stats, duration - Real-time job status updates via polling - Cancel button with confirmation dialog - Loading state and error handling ### Logging Fix - Changed from Bun.write() to fs.appendFile() for reliable log appending - Logs now persist across server restarts instead of being truncated Co-Authored-By: Claude Sonnet 4.5 --- src/backend/config.js | 14 +- src/backend/db/schema/index.js | 23 ++- src/backend/index.js | 37 +++++ .../migrations/add-qso-changes-table.js | 74 +++++++++ src/backend/services/dcl.service.js | 62 ++++++- src/backend/services/job-queue.service.js | 115 ++++++++++++- src/backend/services/lotw.service.js | 51 +++++- src/frontend/src/lib/api.js | 1 + src/frontend/src/routes/+page.svelte | 151 ++++++++++++++++-- 9 files changed, 502 insertions(+), 26 deletions(-) create mode 100644 src/backend/migrations/add-qso-changes-table.js diff --git a/src/backend/config.js b/src/backend/config.js index 0203df8..56831fb 100644 --- a/src/backend/config.js +++ b/src/backend/config.js @@ -2,7 +2,7 @@ import Database from 'bun:sqlite'; import { drizzle } from 'drizzle-orm/bun-sqlite'; import * as schema from './db/schema/index.js'; import { join, dirname } from 'path'; -import { existsSync, mkdirSync } from 'fs'; +import { existsSync, mkdirSync, appendFile } from 'fs'; import { fileURLToPath } from 'url'; // =================================================================== @@ -50,9 +50,9 @@ function log(level, message, data) { const logMessage = formatLogMessage(level, message, data); - // Write to file asynchronously (fire and forget for performance) - Bun.write(backendLogFile, logMessage, { createPath: true }).catch(err => { - console.error('Failed to write to log file:', err); + // Append to file asynchronously (fire and forget for performance) + appendFile(backendLogFile, logMessage, (err) => { + if (err) console.error('Failed to write to log file:', err); }); // Also log to console in development @@ -90,9 +90,9 @@ export function logToFrontend(level, message, data = null, context = {}) { logMessage += '\n'; - // Write to frontend log file - Bun.write(frontendLogFile, logMessage, { createPath: true }).catch(err => { - console.error('Failed to write to frontend log file:', err); + // Append to frontend log file + appendFile(frontendLogFile, logMessage, (err) => { + if (err) console.error('Failed to write to frontend log file:', err); }); } diff --git a/src/backend/db/schema/index.js b/src/backend/db/schema/index.js index efc4584..b86701b 100644 --- a/src/backend/db/schema/index.js +++ b/src/backend/db/schema/index.js @@ -181,5 +181,26 @@ export const syncJobs = sqliteTable('sync_jobs', { createdAt: integer('created_at', { mode: 'timestamp' }).notNull().$defaultFn(() => new Date()), }); +/** + * @typedef {Object} QSOChange + * @property {number} id + * @property {number} jobId + * @property {number|null} qsoId + * @property {string} changeType - 'added' or 'updated' + * @property {string|null} beforeData - JSON snapshot before change (for updates) + * @property {string|null} afterData - JSON snapshot after change + * @property {Date} createdAt + */ + +export const qsoChanges = sqliteTable('qso_changes', { + id: integer('id').primaryKey({ autoIncrement: true }), + jobId: integer('job_id').notNull().references(() => syncJobs.id), + qsoId: integer('qso_id').references(() => qsos.id), // null for added QSOs until created + changeType: text('change_type').notNull(), // 'added' or 'updated' + beforeData: text('before_data'), // JSON snapshot before change + afterData: text('after_data'), // JSON snapshot after change + createdAt: integer('created_at', { mode: 'timestamp' }).notNull().$defaultFn(() => new Date()), +}); + // Export all schemas -export const schema = { users, qsos, awards, awardProgress, syncJobs }; +export const schema = { users, qsos, awards, awardProgress, syncJobs, qsoChanges }; diff --git a/src/backend/index.js b/src/backend/index.js index 30e8e6d..ea86082 100644 --- a/src/backend/index.js +++ b/src/backend/index.js @@ -20,6 +20,7 @@ import { getJobStatus, getUserActiveJob, getUserJobs, + cancelJob, } from './services/job-queue.service.js'; import { getAllAwards, @@ -485,6 +486,42 @@ const app = new Elysia() } }) + /** + * DELETE /api/jobs/:jobId + * Cancel and rollback a sync job (requires authentication) + * Only allows cancelling failed, completed, or stale running jobs (>1 hour) + */ + .delete('/api/jobs/:jobId', async ({ user, params, set }) => { + if (!user) { + set.status = 401; + return { success: false, error: 'Unauthorized' }; + } + + try { + const jobId = parseInt(params.jobId); + if (isNaN(jobId)) { + set.status = 400; + return { success: false, error: 'Invalid job ID' }; + } + + const result = await cancelJob(jobId, user.id); + + if (!result.success) { + set.status = 400; + return result; + } + + return result; + } catch (error) { + logger.error('Error cancelling job', { error: error.message, userId: user?.id, jobId: params.jobId }); + set.status = 500; + return { + success: false, + error: 'Failed to cancel job', + }; + } + }) + /** * GET /api/qsos * Get user's QSOs (requires authentication) diff --git a/src/backend/migrations/add-qso-changes-table.js b/src/backend/migrations/add-qso-changes-table.js new file mode 100644 index 0000000..613432a --- /dev/null +++ b/src/backend/migrations/add-qso-changes-table.js @@ -0,0 +1,74 @@ +/** + * Migration: Add qso_changes table for sync job rollback + * + * This script adds the qso_changes table which tracks all QSO modifications + * made by sync jobs, enabling rollback functionality for failed or stale jobs. + */ + +import Database from 'bun:sqlite'; +import { join, dirname } from 'path'; +import { fileURLToPath } from 'url'; + +// ES module equivalent of __dirname +const __filename = fileURLToPath(import.meta.url); +const __dirname = dirname(__filename); + +const dbPath = join(__dirname, '../award.db'); +const sqlite = new Database(dbPath); + +async function migrate() { + console.log('Starting migration: Add qso_changes table...'); + + try { + // Check if table already exists + const tableExists = sqlite.query(` + SELECT name FROM sqlite_master + WHERE type='table' AND name='qso_changes' + `).get(); + + if (tableExists) { + console.log('Table qso_changes already exists. Migration complete.'); + sqlite.close(); + return; + } + + // Create qso_changes table + sqlite.exec(` + CREATE TABLE qso_changes ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + job_id INTEGER NOT NULL, + qso_id INTEGER, + change_type TEXT NOT NULL, + before_data TEXT, + after_data TEXT, + created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now') * 1000), + FOREIGN KEY (job_id) REFERENCES sync_jobs(id) ON DELETE CASCADE, + FOREIGN KEY (qso_id) REFERENCES qsos(id) ON DELETE CASCADE + ) + `); + + // Create index for faster lookups during rollback + sqlite.exec(` + CREATE INDEX idx_qso_changes_job_id ON qso_changes(job_id) + `); + + // Create index for change_type lookups + sqlite.exec(` + CREATE INDEX idx_qso_changes_change_type ON qso_changes(change_type) + `); + + console.log('Migration complete! Created qso_changes table with indexes.'); + } catch (error) { + console.error('Migration failed:', error); + sqlite.close(); + process.exit(1); + } + + sqlite.close(); +} + +// Run migration +migrate().then(() => { + console.log('Migration script completed successfully'); + process.exit(0); +}); diff --git a/src/backend/services/dcl.service.js b/src/backend/services/dcl.service.js index f4b8d06..1542274 100644 --- a/src/backend/services/dcl.service.js +++ b/src/backend/services/dcl.service.js @@ -1,5 +1,5 @@ import { db, logger } from '../config.js'; -import { qsos } from '../db/schema/index.js'; +import { qsos, qsoChanges } from '../db/schema/index.js'; import { max, sql, eq, and, desc } from 'drizzle-orm'; import { updateJobProgress } from './job-queue.service.js'; import { parseDCLResponse, normalizeBand, normalizeMode } from '../utils/adif-parser.js'; @@ -253,6 +253,18 @@ export async function syncQSOs(userId, dclApiKey, sinceDate = null, jobId = null existingQSO.grid !== (dbQSO.grid || existingQSO.grid); if (dataChanged) { + // Record before state for rollback + const beforeData = JSON.stringify({ + dclQslRstatus: existingQSO.dclQslRstatus, + dclQslRdate: existingQSO.dclQslRdate, + darcDok: existingQSO.darcDok, + myDarcDok: existingQSO.myDarcDok, + grid: existingQSO.grid, + gridSource: existingQSO.gridSource, + entity: existingQSO.entity, + entityId: existingQSO.entityId, + }); + // Update existing QSO with changed DCL confirmation and DOK data const updateData = { dclQslRdate: dbQSO.dclQslRdate, @@ -291,9 +303,34 @@ export async function syncQSOs(userId, dclApiKey, sinceDate = null, jobId = null .update(qsos) .set(updateData) .where(eq(qsos.id, existingQSO.id)); + + // Record after state for rollback + const afterData = JSON.stringify({ + dclQslRstatus: dbQSO.dclQslRstatus, + dclQslRdate: dbQSO.dclQslRdate, + darcDok: updateData.darcDok, + myDarcDok: updateData.myDarcDok, + grid: updateData.grid, + gridSource: updateData.gridSource, + entity: updateData.entity, + entityId: updateData.entityId, + }); + + // Track change in qso_changes table if jobId provided + if (jobId) { + await db.insert(qsoChanges).values({ + jobId, + qsoId: existingQSO.id, + changeType: 'updated', + beforeData, + afterData, + }); + } + updatedCount++; // Track updated QSO (CALL and DATE) updatedQSOs.push({ + id: existingQSO.id, callsign: dbQSO.callsign, date: dbQSO.qsoDate, band: dbQSO.band, @@ -305,10 +342,31 @@ export async function syncQSOs(userId, dclApiKey, sinceDate = null, jobId = null } } else { // Insert new QSO - await db.insert(qsos).values(dbQSO); + const [newQSO] = await db.insert(qsos).values(dbQSO).returning(); + + // Track change in qso_changes table if jobId provided + if (jobId) { + const afterData = JSON.stringify({ + callsign: dbQSO.callsign, + qsoDate: dbQSO.qsoDate, + timeOn: dbQSO.timeOn, + band: dbQSO.band, + mode: dbQSO.mode, + }); + + await db.insert(qsoChanges).values({ + jobId, + qsoId: newQSO.id, + changeType: 'added', + beforeData: null, + afterData, + }); + } + addedCount++; // Track added QSO (CALL and DATE) addedQSOs.push({ + id: newQSO.id, callsign: dbQSO.callsign, date: dbQSO.qsoDate, band: dbQSO.band, diff --git a/src/backend/services/job-queue.service.js b/src/backend/services/job-queue.service.js index 651c5b9..a5e3ba1 100644 --- a/src/backend/services/job-queue.service.js +++ b/src/backend/services/job-queue.service.js @@ -1,6 +1,6 @@ import { db, logger } from '../config.js'; -import { syncJobs } from '../db/schema/index.js'; -import { eq, and, or, lt } from 'drizzle-orm'; +import { syncJobs, qsoChanges, qsos } from '../db/schema/index.js'; +import { eq, and, or, lt, desc } from 'drizzle-orm'; /** * Simplified Background Job Queue Service @@ -252,7 +252,7 @@ export async function getUserActiveJob(userId, jobType = null) { .select() .from(syncJobs) .where(and(...conditions)) - .orderBy(syncJobs.createdAt) + .orderBy(desc(syncJobs.createdAt)) .limit(1); return job || null; @@ -269,7 +269,7 @@ export async function getUserJobs(userId, limit = 10) { .select() .from(syncJobs) .where(eq(syncJobs.userId, userId)) - .orderBy(syncJobs.createdAt) + .orderBy(desc(syncJobs.createdAt)) .limit(limit); return jobs.map((job) => { @@ -342,3 +342,110 @@ export async function updateJobProgress(jobId, progressData) { result: JSON.stringify(updatedData), }); } + +/** + * Cancel and rollback a sync job + * Deletes added QSOs and restores updated QSOs to their previous state + * @param {number} jobId - Job ID to cancel + * @param {number} userId - User ID (for security check) + * @returns {Promise} Result of cancellation + */ +export async function cancelJob(jobId, userId) { + logger.info('Cancelling job', { jobId, userId }); + + // Get job to verify ownership + const job = await getJob(jobId); + if (!job) { + return { success: false, error: 'Job not found' }; + } + + // Verify user owns this job + if (job.userId !== userId) { + return { success: false, error: 'Forbidden' }; + } + + // Only allow cancelling failed jobs or stale running jobs + const isStale = job.status === JobStatus.RUNNING && job.startedAt && + (Date.now() - new Date(job.startedAt).getTime()) > 60 * 60 * 1000; // 1 hour + + if (job.status === JobStatus.PENDING) { + return { success: false, error: 'Cannot cancel pending jobs' }; + } + + if (job.status === JobStatus.COMPLETED) { + return { success: false, error: 'Cannot cancel completed jobs' }; + } + + if (job.status === JobStatus.RUNNING && !isStale) { + return { success: false, error: 'Cannot cancel active jobs (only stale jobs older than 1 hour)' }; + } + + // Get all QSO changes for this job + const changes = await db + .select() + .from(qsoChanges) + .where(eq(qsoChanges.jobId, jobId)); + + let deletedAdded = 0; + let restoredUpdated = 0; + + for (const change of changes) { + if (change.changeType === 'added' && change.qsoId) { + // Delete the QSO that was added + await db.delete(qsos).where(eq(qsos.id, change.qsoId)); + deletedAdded++; + } else if (change.changeType === 'updated' && change.qsoId && change.beforeData) { + // Restore the QSO to its previous state + try { + const beforeData = JSON.parse(change.beforeData); + + // Build update object based on job type + const updateData = {}; + + if (job.type === 'lotw_sync') { + if (beforeData.lotwQslRstatus !== undefined) updateData.lotwQslRstatus = beforeData.lotwQslRstatus; + if (beforeData.lotwQslRdate !== undefined) updateData.lotwQslRdate = beforeData.lotwQslRdate; + } else if (job.type === 'dcl_sync') { + if (beforeData.dclQslRstatus !== undefined) updateData.dclQslRstatus = beforeData.dclQslRstatus; + if (beforeData.dclQslRdate !== undefined) updateData.dclQslRdate = beforeData.dclQslRdate; + if (beforeData.darcDok !== undefined) updateData.darcDok = beforeData.darcDok; + if (beforeData.myDarcDok !== undefined) updateData.myDarcDok = beforeData.myDarcDok; + if (beforeData.grid !== undefined) updateData.grid = beforeData.grid; + if (beforeData.gridSource !== undefined) updateData.gridSource = beforeData.gridSource; + if (beforeData.entity !== undefined) updateData.entity = beforeData.entity; + if (beforeData.entityId !== undefined) updateData.entityId = beforeData.entityId; + } + + if (Object.keys(updateData).length > 0) { + await db.update(qsos).set(updateData).where(eq(qsos.id, change.qsoId)); + restoredUpdated++; + } + } catch (error) { + logger.error('Failed to restore QSO', { qsoId: change.qsoId, error: error.message }); + } + } + } + + // Delete all change records for this job + await db.delete(qsoChanges).where(eq(qsoChanges.jobId, jobId)); + + // Update job status to cancelled + await updateJob(jobId, { + status: 'cancelled', + completedAt: new Date(), + result: JSON.stringify({ + cancelled: true, + deletedAdded, + restoredUpdated, + }), + }); + + logger.info('Job cancelled successfully', { jobId, deletedAdded, restoredUpdated }); + + return { + success: true, + message: `Job cancelled: ${deletedAdded} QSOs deleted, ${restoredUpdated} QSOs restored`, + deletedAdded, + restoredUpdated, + }; +} diff --git a/src/backend/services/lotw.service.js b/src/backend/services/lotw.service.js index fa81f07..652b346 100644 --- a/src/backend/services/lotw.service.js +++ b/src/backend/services/lotw.service.js @@ -1,5 +1,5 @@ import { db, logger } from '../config.js'; -import { qsos } from '../db/schema/index.js'; +import { qsos, qsoChanges } from '../db/schema/index.js'; import { max, sql, eq, and, or, desc, like } from 'drizzle-orm'; import { updateJobProgress } from './job-queue.service.js'; import { parseADIF, normalizeBand, normalizeMode } from '../utils/adif-parser.js'; @@ -258,6 +258,12 @@ export async function syncQSOs(userId, lotwUsername, lotwPassword, sinceDate = n existingQSO.lotwQslRdate !== dbQSO.lotwQslRdate; if (confirmationChanged) { + // Record before state for rollback + const beforeData = JSON.stringify({ + lotwQslRstatus: existingQSO.lotwQslRstatus, + lotwQslRdate: existingQSO.lotwQslRdate, + }); + await db .update(qsos) .set({ @@ -266,9 +272,28 @@ export async function syncQSOs(userId, lotwUsername, lotwPassword, sinceDate = n lotwSyncedAt: dbQSO.lotwSyncedAt, }) .where(eq(qsos.id, existingQSO.id)); + + // Record after state for rollback + const afterData = JSON.stringify({ + lotwQslRstatus: dbQSO.lotwQslRstatus, + lotwQslRdate: dbQSO.lotwQslRdate, + }); + + // Track change in qso_changes table if jobId provided + if (jobId) { + await db.insert(qsoChanges).values({ + jobId, + qsoId: existingQSO.id, + changeType: 'updated', + beforeData, + afterData, + }); + } + updatedCount++; // Track updated QSO (CALL and DATE) updatedQSOs.push({ + id: existingQSO.id, callsign: dbQSO.callsign, date: dbQSO.qsoDate, band: dbQSO.band, @@ -279,10 +304,32 @@ export async function syncQSOs(userId, lotwUsername, lotwPassword, sinceDate = n skippedCount++; } } else { - await db.insert(qsos).values(dbQSO); + // Insert new QSO + const [newQSO] = await db.insert(qsos).values(dbQSO).returning(); + + // Track change in qso_changes table if jobId provided + if (jobId) { + const afterData = JSON.stringify({ + callsign: dbQSO.callsign, + qsoDate: dbQSO.qsoDate, + timeOn: dbQSO.timeOn, + band: dbQSO.band, + mode: dbQSO.mode, + }); + + await db.insert(qsoChanges).values({ + jobId, + qsoId: newQSO.id, + changeType: 'added', + beforeData: null, + afterData, + }); + } + addedCount++; // Track added QSO (CALL and DATE) addedQSOs.push({ + id: newQSO.id, callsign: dbQSO.callsign, date: dbQSO.qsoDate, band: dbQSO.band, diff --git a/src/frontend/src/lib/api.js b/src/frontend/src/lib/api.js index 786af04..f4ccfef 100644 --- a/src/frontend/src/lib/api.js +++ b/src/frontend/src/lib/api.js @@ -84,4 +84,5 @@ export const jobsAPI = { getStatus: (jobId) => apiRequest(`/jobs/${jobId}`), getActive: () => apiRequest('/jobs/active'), getRecent: (limit = 10) => apiRequest(`/jobs?limit=${limit}`), + cancel: (jobId) => apiRequest(`/jobs/${jobId}`, { method: 'DELETE' }), }; diff --git a/src/frontend/src/routes/+page.svelte b/src/frontend/src/routes/+page.svelte index 2854b81..43cbd39 100644 --- a/src/frontend/src/routes/+page.svelte +++ b/src/frontend/src/routes/+page.svelte @@ -1,11 +1,53 @@