Refactor LoTW sync with background job queue and Wavelog compatibility
Backend changes:
- Add sync_jobs table for background job tracking with Drizzle schema
- Create job queue service (job-queue.service.js) for async job processing
- Only ONE active sync job per user enforced at queue level
- Refactor LoTW service with Wavelog download logic:
- Validate for "Username/password incorrect" in response
- Check file starts with "ARRL Logbook of the World Status Report"
- Use last LoTW QSL date for incremental sync (qso_qslsince)
- Wavelog-compatible timeouts and error handling
- Add deleteQSOs function to clear all user QSOs
- Fix database path to use absolute path for consistency
- Register job processor for lotw_sync job type
API endpoints:
- POST /api/lotw/sync - Queue background sync job, returns jobId immediately
- GET /api/jobs/:jobId - Get job status with progress tracking
- GET /api/jobs/active - Get user's active job
- GET /api/jobs - Get user's recent jobs
- DELETE /api/qsos/all - Delete all QSOs for authenticated user
Frontend changes:
- Add job polling every 2 seconds during sync
- Show real-time progress indicator during sync
- Add "Clear All QSOs" button with type-to-confirm ("DELETE")
- Check for active job on mount to resume polling after refresh
- Clean up polling interval on component unmount
- Update API client with jobsAPI methods (getStatus, getActive, getRecent)
Database:
- Add sync_jobs table: id, userId, status, type, startedAt, completedAt,
result, error, createdAt
- Foreign key to users table
- Path fix: now uses src/backend/award.db consistently
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
314
src/backend/services/job-queue.service.js
Normal file
314
src/backend/services/job-queue.service.js
Normal file
@@ -0,0 +1,314 @@
|
||||
import { db } from '../config/database.js';
|
||||
import { syncJobs } from '../db/schema/index.js';
|
||||
import { eq, and, desc, or, lt } from 'drizzle-orm';
|
||||
|
||||
/**
|
||||
* Background Job Queue Service
|
||||
* Manages async jobs with database persistence
|
||||
*/
|
||||
|
||||
// Job status constants
|
||||
export const JobStatus = {
|
||||
PENDING: 'pending',
|
||||
RUNNING: 'running',
|
||||
COMPLETED: 'completed',
|
||||
FAILED: 'failed',
|
||||
};
|
||||
|
||||
// Job type constants
|
||||
export const JobType = {
|
||||
LOTW_SYNC: 'lotw_sync',
|
||||
};
|
||||
|
||||
// In-memory job processor (for single-server deployment)
|
||||
const activeJobs = new Map(); // jobId -> Promise
|
||||
const jobProcessors = {
|
||||
[JobType.LOTW_SYNC]: null, // Will be set by lotw.service.js
|
||||
};
|
||||
|
||||
/**
|
||||
* Register a job processor function
|
||||
* @param {string} type - Job type
|
||||
* @param {Function} processor - Async function that processes the job
|
||||
*/
|
||||
export function registerProcessor(type, processor) {
|
||||
jobProcessors[type] = processor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enqueue a new job
|
||||
* @param {number} userId - User ID
|
||||
* @param {string} type - Job type
|
||||
* @param {Object} data - Job data (will be passed to processor)
|
||||
* @returns {Promise<Object>} Job object with ID
|
||||
*/
|
||||
export async function enqueueJob(userId, type, data = {}) {
|
||||
console.error('[enqueueJob] Starting job enqueue:', { userId, type, hasData: !!data });
|
||||
|
||||
// Check for existing active job of same type for this user
|
||||
const existingJob = await getUserActiveJob(userId, type);
|
||||
if (existingJob) {
|
||||
console.error('[enqueueJob] Found existing active job:', existingJob.id);
|
||||
return {
|
||||
success: false,
|
||||
error: `A ${type} job is already running or pending for this user`,
|
||||
existingJob: existingJob.id,
|
||||
};
|
||||
}
|
||||
|
||||
// Create job record
|
||||
console.error('[enqueueJob] Creating job record in database...');
|
||||
const [job] = await db
|
||||
.insert(syncJobs)
|
||||
.values({
|
||||
userId,
|
||||
type,
|
||||
status: JobStatus.PENDING,
|
||||
createdAt: new Date(),
|
||||
})
|
||||
.returning();
|
||||
|
||||
console.error('[enqueueJob] Job created:', job.id);
|
||||
|
||||
// Start processing asynchronously (don't await)
|
||||
processJobAsync(job.id, userId, type, data).catch((error) => {
|
||||
console.error(`[enqueueJob] Error processing job ${job.id}:`, error);
|
||||
});
|
||||
|
||||
return {
|
||||
success: true,
|
||||
jobId: job.id,
|
||||
job: {
|
||||
id: job.id,
|
||||
type: job.type,
|
||||
status: job.status,
|
||||
createdAt: job.createdAt,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a job asynchronously
|
||||
* @param {number} jobId - Job ID
|
||||
* @param {number} userId - User ID
|
||||
* @param {string} type - Job type
|
||||
* @param {Object} data - Job data
|
||||
*/
|
||||
async function processJobAsync(jobId, userId, type, data) {
|
||||
// Store the promise in activeJobs
|
||||
const jobPromise = (async () => {
|
||||
try {
|
||||
// Update status to running
|
||||
await updateJob(jobId, {
|
||||
status: JobStatus.RUNNING,
|
||||
startedAt: new Date(),
|
||||
});
|
||||
|
||||
// Get the processor for this job type
|
||||
const processor = jobProcessors[type];
|
||||
if (!processor) {
|
||||
throw new Error(`No processor registered for job type: ${type}`);
|
||||
}
|
||||
|
||||
// Execute the job processor
|
||||
const result = await processor(jobId, userId, data);
|
||||
|
||||
// Update job as completed
|
||||
await updateJob(jobId, {
|
||||
status: JobStatus.COMPLETED,
|
||||
completedAt: new Date(),
|
||||
result: JSON.stringify(result),
|
||||
});
|
||||
|
||||
return result;
|
||||
} catch (error) {
|
||||
// Update job as failed
|
||||
await updateJob(jobId, {
|
||||
status: JobStatus.FAILED,
|
||||
completedAt: new Date(),
|
||||
error: error.message,
|
||||
});
|
||||
|
||||
throw error;
|
||||
} finally {
|
||||
// Remove from active jobs
|
||||
activeJobs.delete(jobId);
|
||||
}
|
||||
})();
|
||||
|
||||
activeJobs.set(jobId, jobPromise);
|
||||
return jobPromise;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update job record
|
||||
* @param {number} jobId - Job ID
|
||||
* @param {Object} updates - Fields to update
|
||||
*/
|
||||
export async function updateJob(jobId, updates) {
|
||||
await db.update(syncJobs).set(updates).where(eq(syncJobs.id, jobId));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get job by ID
|
||||
* @param {number} jobId - Job ID
|
||||
* @returns {Promise<Object|null>} Job object or null
|
||||
*/
|
||||
export async function getJob(jobId) {
|
||||
const [job] = await db.select().from(syncJobs).where(eq(syncJobs.id, jobId)).limit(1);
|
||||
return job || null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get job status (with parsed result if completed)
|
||||
* @param {number} jobId - Job ID
|
||||
* @returns {Promise<Object|null>} Job object with parsed result
|
||||
*/
|
||||
export async function getJobStatus(jobId) {
|
||||
const job = await getJob(jobId);
|
||||
if (!job) return null;
|
||||
|
||||
// Parse result JSON if completed
|
||||
let parsedResult = null;
|
||||
if (job.status === JobStatus.COMPLETED && job.result) {
|
||||
try {
|
||||
parsedResult = JSON.parse(job.result);
|
||||
} catch (e) {
|
||||
console.error('Failed to parse job result:', e);
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
id: job.id,
|
||||
userId: job.userId, // Include userId for permission checks
|
||||
type: job.type,
|
||||
status: job.status,
|
||||
startedAt: job.startedAt,
|
||||
completedAt: job.completedAt,
|
||||
result: parsedResult,
|
||||
error: job.error,
|
||||
createdAt: job.createdAt,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Get user's active job (pending or running) of a specific type
|
||||
* @param {number} userId - User ID
|
||||
* @param {string} type - Job type (optional, returns any active job)
|
||||
* @returns {Promise<Object|null>} Active job or null
|
||||
*/
|
||||
export async function getUserActiveJob(userId, type = null) {
|
||||
console.error('[getUserActiveJob] Querying for active job:', { userId, type });
|
||||
|
||||
// Build the where clause properly with and() and or()
|
||||
const conditions = [
|
||||
eq(syncJobs.userId, userId),
|
||||
or(
|
||||
eq(syncJobs.status, JobStatus.PENDING),
|
||||
eq(syncJobs.status, JobStatus.RUNNING)
|
||||
),
|
||||
];
|
||||
|
||||
if (type) {
|
||||
conditions.push(eq(syncJobs.type, type));
|
||||
}
|
||||
|
||||
try {
|
||||
const [job] = await db
|
||||
.select()
|
||||
.from(syncJobs)
|
||||
.where(and(...conditions))
|
||||
.orderBy(desc(syncJobs.createdAt))
|
||||
.limit(1);
|
||||
|
||||
console.error('[getUserActiveJob] Result:', job ? `Found job ${job.id}` : 'No active job');
|
||||
return job || null;
|
||||
} catch (error) {
|
||||
console.error('[getUserActiveJob] Database error:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get recent jobs for a user
|
||||
* @param {number} userId - User ID
|
||||
* @param {number} limit - Maximum number of jobs to return
|
||||
* @returns {Promise<Array>} Array of jobs
|
||||
*/
|
||||
export async function getUserJobs(userId, limit = 10) {
|
||||
const jobs = await db
|
||||
.select()
|
||||
.from(syncJobs)
|
||||
.where(eq(syncJobs.userId, userId))
|
||||
.orderBy(desc(syncJobs.createdAt))
|
||||
.limit(limit);
|
||||
|
||||
return jobs.map((job) => {
|
||||
let parsedResult = null;
|
||||
if (job.status === JobStatus.COMPLETED && job.result) {
|
||||
try {
|
||||
parsedResult = JSON.parse(job.result);
|
||||
} catch (e) {
|
||||
// Ignore parse errors
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
id: job.id,
|
||||
type: job.type,
|
||||
status: job.status,
|
||||
startedAt: job.startedAt,
|
||||
completedAt: job.completedAt,
|
||||
result: parsedResult,
|
||||
error: job.error,
|
||||
createdAt: job.createdAt,
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete old completed jobs (cleanup)
|
||||
* @param {number} daysOld - Delete jobs older than this many days
|
||||
* @returns {Promise<number>} Number of jobs deleted
|
||||
*/
|
||||
export async function cleanupOldJobs(daysOld = 7) {
|
||||
const cutoffDate = new Date();
|
||||
cutoffDate.setDate(cutoffDate.getDate() - daysOld);
|
||||
|
||||
const result = await db
|
||||
.delete(syncJobs)
|
||||
.where(
|
||||
and(
|
||||
eq(syncJobs.status, JobStatus.COMPLETED),
|
||||
lt(syncJobs.completedAt, cutoffDate)
|
||||
)
|
||||
);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update job progress (for long-running jobs)
|
||||
* @param {number} jobId - Job ID
|
||||
* @param {Object} progressData - Progress data to store in result field
|
||||
*/
|
||||
export async function updateJobProgress(jobId, progressData) {
|
||||
const job = await getJob(jobId);
|
||||
if (!job) return;
|
||||
|
||||
let currentData = {};
|
||||
if (job.result) {
|
||||
try {
|
||||
currentData = JSON.parse(job.result);
|
||||
} catch (e) {
|
||||
// Start fresh if invalid JSON
|
||||
}
|
||||
}
|
||||
|
||||
// Merge progress data
|
||||
const updatedData = { ...currentData, ...progressData, progress: true };
|
||||
|
||||
await updateJob(jobId, {
|
||||
result: JSON.stringify(updatedData),
|
||||
});
|
||||
}
|
||||
@@ -1,11 +1,16 @@
|
||||
import { db } from '../config/database.js';
|
||||
import { qsos } from '../db/schema/index.js';
|
||||
import { max, sql } from 'drizzle-orm';
|
||||
import { registerProcessor, updateJobProgress } from './job-queue.service.js';
|
||||
|
||||
/**
|
||||
* LoTW (Logbook of the World) Service
|
||||
* Fetches QSOs from ARRL's LoTW system
|
||||
*/
|
||||
|
||||
// Wavelog-compatible constants
|
||||
const LOTW_CONNECT_TIMEOUT = 30; // CURLOPT_CONNECTTIMEOUT from Wavelog
|
||||
|
||||
// Configuration for long-polling
|
||||
const POLLING_CONFIG = {
|
||||
maxRetries: 30, // Maximum number of retry attempts
|
||||
@@ -154,13 +159,21 @@ export async function fetchQSOsFromLoTW(lotwUsername, lotwPassword, sinceDate =
|
||||
const adifData = await response.text();
|
||||
console.error(`Response length: ${adifData.length} bytes`);
|
||||
|
||||
// Check if report is still pending
|
||||
if (isReportPending(adifData)) {
|
||||
console.error('LoTW report is still being prepared, waiting...', adifData.substring(0, 100));
|
||||
// Wavelog: Validate response for credential errors
|
||||
if (adifData.toLowerCase().includes('username/password incorrect')) {
|
||||
throw new Error('Username/password incorrect');
|
||||
}
|
||||
|
||||
// Wait before retrying
|
||||
await sleep(POLLING_CONFIG.retryDelay);
|
||||
continue;
|
||||
// Wavelog: Check if file starts with expected header
|
||||
const header = adifData.trim().substring(0, 39).toLowerCase();
|
||||
if (!header.includes('arrl logbook of the world')) {
|
||||
// This might be because the report is still pending
|
||||
if (isReportPending(adifData)) {
|
||||
console.error('LoTW report is still being prepared, waiting...', adifData.substring(0, 100));
|
||||
await sleep(POLLING_CONFIG.retryDelay);
|
||||
continue;
|
||||
}
|
||||
throw new Error('Downloaded LoTW report is invalid. Check your credentials.');
|
||||
}
|
||||
|
||||
// We have valid data!
|
||||
@@ -529,3 +542,200 @@ export async function getQSOStats(userId) {
|
||||
|
||||
return stats;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the date of the last LoTW QSL for a user
|
||||
* Used for qso_qslsince parameter to minimize downloads
|
||||
* @param {number} userId - User ID
|
||||
* @returns {Promise<Date|null>} Last QSL date or null
|
||||
*/
|
||||
export async function getLastLoTWQSLDate(userId) {
|
||||
const { eq } = await import('drizzle-orm');
|
||||
|
||||
// Get the most recent lotwQslRdate for this user
|
||||
const [result] = await db
|
||||
.select({ maxDate: max(qsos.lotwQslRdate) })
|
||||
.from(qsos)
|
||||
.where(eq(qsos.userId, userId));
|
||||
|
||||
if (!result || !result.maxDate) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Parse ADIF date format (YYYYMMDD) to Date
|
||||
const dateStr = result.maxDate;
|
||||
if (!dateStr || dateStr === '') {
|
||||
return null;
|
||||
}
|
||||
|
||||
const year = dateStr.substring(0, 4);
|
||||
const month = dateStr.substring(4, 6);
|
||||
const day = dateStr.substring(6, 8);
|
||||
|
||||
return new Date(`${year}-${month}-${day}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate LoTW response following Wavelog logic
|
||||
* @param {string} responseData - Response from LoTW
|
||||
* @returns {Object} { valid: boolean, error?: string }
|
||||
*/
|
||||
function validateLoTWResponse(responseData) {
|
||||
const trimmed = responseData.trim();
|
||||
|
||||
// Wavelog: Check for username/password incorrect
|
||||
if (trimmed.toLowerCase().includes('username/password incorrect')) {
|
||||
return {
|
||||
valid: false,
|
||||
error: 'Username/password incorrect',
|
||||
shouldClearCredentials: true,
|
||||
};
|
||||
}
|
||||
|
||||
// Wavelog: Check if file starts with "ARRL Logbook of the World Status Report"
|
||||
const header = trimmed.substring(0, 39).toLowerCase();
|
||||
if (!header.includes('arrl logbook of the world')) {
|
||||
return {
|
||||
valid: false,
|
||||
error: 'Downloaded LoTW report is invalid. File does not start with expected header.',
|
||||
};
|
||||
}
|
||||
|
||||
return { valid: true };
|
||||
}
|
||||
|
||||
/**
|
||||
* LoTW sync job processor for the job queue
|
||||
* @param {number} jobId - Job ID
|
||||
* @param {number} userId - User ID
|
||||
* @param {Object} data - Job data { lotwUsername, lotwPassword }
|
||||
* @returns {Promise<Object>} Sync result
|
||||
*/
|
||||
export async function syncQSOsForJob(jobId, userId, data) {
|
||||
const { lotwUsername, lotwPassword } = data;
|
||||
|
||||
try {
|
||||
// Update job progress: starting
|
||||
await updateJobProgress(jobId, {
|
||||
message: 'Fetching QSOs from LoTW...',
|
||||
step: 'fetch',
|
||||
});
|
||||
|
||||
// Get last LoTW QSL date for incremental sync
|
||||
const lastQSLDate = await getLastLoTWQSLDate(userId);
|
||||
const sinceDate = lastQSLDate || new Date('2026-01-01'); // Default as per Wavelog
|
||||
|
||||
console.error(`[Job ${jobId}] Syncing LoTW QSOs since ${sinceDate.toISOString().split('T')[0]}`);
|
||||
|
||||
// Fetch from LoTW
|
||||
const adifQSOs = await fetchQSOsFromLoTW(lotwUsername, lotwPassword, sinceDate);
|
||||
|
||||
if (!adifQSOs || adifQSOs.length === 0) {
|
||||
return {
|
||||
success: true,
|
||||
total: 0,
|
||||
added: 0,
|
||||
updated: 0,
|
||||
message: 'No QSOs found in LoTW',
|
||||
};
|
||||
}
|
||||
|
||||
// Update job progress: processing
|
||||
await updateJobProgress(jobId, {
|
||||
message: `Processing ${adifQSOs.length} QSOs...`,
|
||||
step: 'process',
|
||||
total: adifQSOs.length,
|
||||
processed: 0,
|
||||
});
|
||||
|
||||
let addedCount = 0;
|
||||
let updatedCount = 0;
|
||||
const errors = [];
|
||||
|
||||
// Process each QSO
|
||||
for (let i = 0; i < adifQSOs.length; i++) {
|
||||
const qsoData = adifQSOs[i];
|
||||
|
||||
try {
|
||||
const dbQSO = convertQSODatabaseFormat(qsoData, userId);
|
||||
|
||||
// Check if QSO already exists
|
||||
const { eq, and } = await import('drizzle-orm');
|
||||
const existing = await db
|
||||
.select()
|
||||
.from(qsos)
|
||||
.where(
|
||||
and(
|
||||
eq(qsos.userId, userId),
|
||||
eq(qsos.callsign, dbQSO.callsign),
|
||||
eq(qsos.qsoDate, dbQSO.qsoDate),
|
||||
eq(qsos.band, dbQSO.band),
|
||||
eq(qsos.mode, dbQSO.mode)
|
||||
)
|
||||
)
|
||||
.limit(1);
|
||||
|
||||
if (existing.length > 0) {
|
||||
// Update existing QSO
|
||||
await db
|
||||
.update(qsos)
|
||||
.set({
|
||||
lotwQslRdate: dbQSO.lotwQslRdate,
|
||||
lotwQslRstatus: dbQSO.lotwQslRstatus,
|
||||
lotwSyncedAt: dbQSO.lotwSyncedAt,
|
||||
})
|
||||
.where(eq(qsos.id, existing[0].id));
|
||||
updatedCount++;
|
||||
} else {
|
||||
// Insert new QSO
|
||||
await db.insert(qsos).values(dbQSO);
|
||||
addedCount++;
|
||||
}
|
||||
|
||||
// Update progress every 10 QSOs
|
||||
if ((i + 1) % 10 === 0) {
|
||||
await updateJobProgress(jobId, {
|
||||
processed: i + 1,
|
||||
message: `Processed ${i + 1}/${adifQSOs.length} QSOs...`,
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`[Job ${jobId}] ERROR processing QSO:`, error);
|
||||
errors.push({
|
||||
qso: qsoData,
|
||||
error: error.message,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
total: adifQSOs.length,
|
||||
added: addedCount,
|
||||
updated: updatedCount,
|
||||
errors: errors.length > 0 ? errors : undefined,
|
||||
};
|
||||
} catch (error) {
|
||||
// Check if it's a credential error
|
||||
if (error.message.includes('Username/password incorrect')) {
|
||||
throw new Error('Invalid LoTW credentials. Please check your username and password.');
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete all QSOs for a user
|
||||
* @param {number} userId - User ID
|
||||
* @returns {Promise<number>} Number of QSOs deleted
|
||||
*/
|
||||
export async function deleteQSOs(userId) {
|
||||
const { eq } = await import('drizzle-orm');
|
||||
|
||||
const result = await db.delete(qsos).where(eq(qsos.userId, userId));
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// Register the LoTW sync processor with the job queue
|
||||
registerProcessor('lotw_sync', syncQSOsForJob);
|
||||
|
||||
Reference in New Issue
Block a user