feat: add sync job cancel and rollback with real-time updates

Implement comprehensive sync job management with rollback capabilities
and real-time status updates on the dashboard.

## Features

### Cancel & Rollback
- Users can cancel failed or stale (>1h) sync jobs
- Rollback deletes added QSOs and restores updated QSOs to previous state
- Uses qso_changes table to track all modifications with before/after snapshots
- Server-side validation prevents cancelling completed or active jobs

### Database Changes
- Add qso_changes table to track QSO modifications per job
- Stores change type (added/updated), before/after data snapshots
- Enables precise rollback of sync operations
- Migration script included

### Real-time Updates
- Dashboard now polls for job updates every 2 seconds
- Smart polling: starts when jobs active, stops when complete
- Job status badges update in real-time (pending → running → completed)
- Cancel button appears/disappears based on job state

### Backend
- Fixed job ordering to show newest first (desc createdAt)
- Track all QSO changes during LoTW/DCL sync operations
- cancelJob() function handles rollback logic
- DELETE /api/jobs/:jobId endpoint for cancelling jobs

### Frontend
- jobsAPI.cancel() method for cancelling jobs
- Dashboard shows last 5 sync jobs with status, stats, duration
- Real-time job status updates via polling
- Cancel button with confirmation dialog
- Loading state and error handling

### Logging Fix
- Changed from Bun.write() to fs.appendFile() for reliable log appending
- Logs now persist across server restarts instead of being truncated

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-20 11:46:19 +01:00
parent 56be3c0702
commit a50b4ae724
9 changed files with 502 additions and 26 deletions

View File

@@ -1,5 +1,5 @@
import { db, logger } from '../config.js';
import { qsos } from '../db/schema/index.js';
import { qsos, qsoChanges } from '../db/schema/index.js';
import { max, sql, eq, and, desc } from 'drizzle-orm';
import { updateJobProgress } from './job-queue.service.js';
import { parseDCLResponse, normalizeBand, normalizeMode } from '../utils/adif-parser.js';
@@ -253,6 +253,18 @@ export async function syncQSOs(userId, dclApiKey, sinceDate = null, jobId = null
existingQSO.grid !== (dbQSO.grid || existingQSO.grid);
if (dataChanged) {
// Record before state for rollback
const beforeData = JSON.stringify({
dclQslRstatus: existingQSO.dclQslRstatus,
dclQslRdate: existingQSO.dclQslRdate,
darcDok: existingQSO.darcDok,
myDarcDok: existingQSO.myDarcDok,
grid: existingQSO.grid,
gridSource: existingQSO.gridSource,
entity: existingQSO.entity,
entityId: existingQSO.entityId,
});
// Update existing QSO with changed DCL confirmation and DOK data
const updateData = {
dclQslRdate: dbQSO.dclQslRdate,
@@ -291,9 +303,34 @@ export async function syncQSOs(userId, dclApiKey, sinceDate = null, jobId = null
.update(qsos)
.set(updateData)
.where(eq(qsos.id, existingQSO.id));
// Record after state for rollback
const afterData = JSON.stringify({
dclQslRstatus: dbQSO.dclQslRstatus,
dclQslRdate: dbQSO.dclQslRdate,
darcDok: updateData.darcDok,
myDarcDok: updateData.myDarcDok,
grid: updateData.grid,
gridSource: updateData.gridSource,
entity: updateData.entity,
entityId: updateData.entityId,
});
// Track change in qso_changes table if jobId provided
if (jobId) {
await db.insert(qsoChanges).values({
jobId,
qsoId: existingQSO.id,
changeType: 'updated',
beforeData,
afterData,
});
}
updatedCount++;
// Track updated QSO (CALL and DATE)
updatedQSOs.push({
id: existingQSO.id,
callsign: dbQSO.callsign,
date: dbQSO.qsoDate,
band: dbQSO.band,
@@ -305,10 +342,31 @@ export async function syncQSOs(userId, dclApiKey, sinceDate = null, jobId = null
}
} else {
// Insert new QSO
await db.insert(qsos).values(dbQSO);
const [newQSO] = await db.insert(qsos).values(dbQSO).returning();
// Track change in qso_changes table if jobId provided
if (jobId) {
const afterData = JSON.stringify({
callsign: dbQSO.callsign,
qsoDate: dbQSO.qsoDate,
timeOn: dbQSO.timeOn,
band: dbQSO.band,
mode: dbQSO.mode,
});
await db.insert(qsoChanges).values({
jobId,
qsoId: newQSO.id,
changeType: 'added',
beforeData: null,
afterData,
});
}
addedCount++;
// Track added QSO (CALL and DATE)
addedQSOs.push({
id: newQSO.id,
callsign: dbQSO.callsign,
date: dbQSO.qsoDate,
band: dbQSO.band,

View File

@@ -1,6 +1,6 @@
import { db, logger } from '../config.js';
import { syncJobs } from '../db/schema/index.js';
import { eq, and, or, lt } from 'drizzle-orm';
import { syncJobs, qsoChanges, qsos } from '../db/schema/index.js';
import { eq, and, or, lt, desc } from 'drizzle-orm';
/**
* Simplified Background Job Queue Service
@@ -252,7 +252,7 @@ export async function getUserActiveJob(userId, jobType = null) {
.select()
.from(syncJobs)
.where(and(...conditions))
.orderBy(syncJobs.createdAt)
.orderBy(desc(syncJobs.createdAt))
.limit(1);
return job || null;
@@ -269,7 +269,7 @@ export async function getUserJobs(userId, limit = 10) {
.select()
.from(syncJobs)
.where(eq(syncJobs.userId, userId))
.orderBy(syncJobs.createdAt)
.orderBy(desc(syncJobs.createdAt))
.limit(limit);
return jobs.map((job) => {
@@ -342,3 +342,110 @@ export async function updateJobProgress(jobId, progressData) {
result: JSON.stringify(updatedData),
});
}
/**
* Cancel and rollback a sync job
* Deletes added QSOs and restores updated QSOs to their previous state
* @param {number} jobId - Job ID to cancel
* @param {number} userId - User ID (for security check)
* @returns {Promise<Object>} Result of cancellation
*/
export async function cancelJob(jobId, userId) {
logger.info('Cancelling job', { jobId, userId });
// Get job to verify ownership
const job = await getJob(jobId);
if (!job) {
return { success: false, error: 'Job not found' };
}
// Verify user owns this job
if (job.userId !== userId) {
return { success: false, error: 'Forbidden' };
}
// Only allow cancelling failed jobs or stale running jobs
const isStale = job.status === JobStatus.RUNNING && job.startedAt &&
(Date.now() - new Date(job.startedAt).getTime()) > 60 * 60 * 1000; // 1 hour
if (job.status === JobStatus.PENDING) {
return { success: false, error: 'Cannot cancel pending jobs' };
}
if (job.status === JobStatus.COMPLETED) {
return { success: false, error: 'Cannot cancel completed jobs' };
}
if (job.status === JobStatus.RUNNING && !isStale) {
return { success: false, error: 'Cannot cancel active jobs (only stale jobs older than 1 hour)' };
}
// Get all QSO changes for this job
const changes = await db
.select()
.from(qsoChanges)
.where(eq(qsoChanges.jobId, jobId));
let deletedAdded = 0;
let restoredUpdated = 0;
for (const change of changes) {
if (change.changeType === 'added' && change.qsoId) {
// Delete the QSO that was added
await db.delete(qsos).where(eq(qsos.id, change.qsoId));
deletedAdded++;
} else if (change.changeType === 'updated' && change.qsoId && change.beforeData) {
// Restore the QSO to its previous state
try {
const beforeData = JSON.parse(change.beforeData);
// Build update object based on job type
const updateData = {};
if (job.type === 'lotw_sync') {
if (beforeData.lotwQslRstatus !== undefined) updateData.lotwQslRstatus = beforeData.lotwQslRstatus;
if (beforeData.lotwQslRdate !== undefined) updateData.lotwQslRdate = beforeData.lotwQslRdate;
} else if (job.type === 'dcl_sync') {
if (beforeData.dclQslRstatus !== undefined) updateData.dclQslRstatus = beforeData.dclQslRstatus;
if (beforeData.dclQslRdate !== undefined) updateData.dclQslRdate = beforeData.dclQslRdate;
if (beforeData.darcDok !== undefined) updateData.darcDok = beforeData.darcDok;
if (beforeData.myDarcDok !== undefined) updateData.myDarcDok = beforeData.myDarcDok;
if (beforeData.grid !== undefined) updateData.grid = beforeData.grid;
if (beforeData.gridSource !== undefined) updateData.gridSource = beforeData.gridSource;
if (beforeData.entity !== undefined) updateData.entity = beforeData.entity;
if (beforeData.entityId !== undefined) updateData.entityId = beforeData.entityId;
}
if (Object.keys(updateData).length > 0) {
await db.update(qsos).set(updateData).where(eq(qsos.id, change.qsoId));
restoredUpdated++;
}
} catch (error) {
logger.error('Failed to restore QSO', { qsoId: change.qsoId, error: error.message });
}
}
}
// Delete all change records for this job
await db.delete(qsoChanges).where(eq(qsoChanges.jobId, jobId));
// Update job status to cancelled
await updateJob(jobId, {
status: 'cancelled',
completedAt: new Date(),
result: JSON.stringify({
cancelled: true,
deletedAdded,
restoredUpdated,
}),
});
logger.info('Job cancelled successfully', { jobId, deletedAdded, restoredUpdated });
return {
success: true,
message: `Job cancelled: ${deletedAdded} QSOs deleted, ${restoredUpdated} QSOs restored`,
deletedAdded,
restoredUpdated,
};
}

View File

@@ -1,5 +1,5 @@
import { db, logger } from '../config.js';
import { qsos } from '../db/schema/index.js';
import { qsos, qsoChanges } from '../db/schema/index.js';
import { max, sql, eq, and, or, desc, like } from 'drizzle-orm';
import { updateJobProgress } from './job-queue.service.js';
import { parseADIF, normalizeBand, normalizeMode } from '../utils/adif-parser.js';
@@ -258,6 +258,12 @@ export async function syncQSOs(userId, lotwUsername, lotwPassword, sinceDate = n
existingQSO.lotwQslRdate !== dbQSO.lotwQslRdate;
if (confirmationChanged) {
// Record before state for rollback
const beforeData = JSON.stringify({
lotwQslRstatus: existingQSO.lotwQslRstatus,
lotwQslRdate: existingQSO.lotwQslRdate,
});
await db
.update(qsos)
.set({
@@ -266,9 +272,28 @@ export async function syncQSOs(userId, lotwUsername, lotwPassword, sinceDate = n
lotwSyncedAt: dbQSO.lotwSyncedAt,
})
.where(eq(qsos.id, existingQSO.id));
// Record after state for rollback
const afterData = JSON.stringify({
lotwQslRstatus: dbQSO.lotwQslRstatus,
lotwQslRdate: dbQSO.lotwQslRdate,
});
// Track change in qso_changes table if jobId provided
if (jobId) {
await db.insert(qsoChanges).values({
jobId,
qsoId: existingQSO.id,
changeType: 'updated',
beforeData,
afterData,
});
}
updatedCount++;
// Track updated QSO (CALL and DATE)
updatedQSOs.push({
id: existingQSO.id,
callsign: dbQSO.callsign,
date: dbQSO.qsoDate,
band: dbQSO.band,
@@ -279,10 +304,32 @@ export async function syncQSOs(userId, lotwUsername, lotwPassword, sinceDate = n
skippedCount++;
}
} else {
await db.insert(qsos).values(dbQSO);
// Insert new QSO
const [newQSO] = await db.insert(qsos).values(dbQSO).returning();
// Track change in qso_changes table if jobId provided
if (jobId) {
const afterData = JSON.stringify({
callsign: dbQSO.callsign,
qsoDate: dbQSO.qsoDate,
timeOn: dbQSO.timeOn,
band: dbQSO.band,
mode: dbQSO.mode,
});
await db.insert(qsoChanges).values({
jobId,
qsoId: newQSO.id,
changeType: 'added',
beforeData: null,
afterData,
});
}
addedCount++;
// Track added QSO (CALL and DATE)
addedQSOs.push({
id: newQSO.id,
callsign: dbQSO.callsign,
date: dbQSO.qsoDate,
band: dbQSO.band,