diff --git a/src/backend/services/dcl.service.js b/src/backend/services/dcl.service.js index 35acdd7..dd9916c 100644 --- a/src/backend/services/dcl.service.js +++ b/src/backend/services/dcl.service.js @@ -170,7 +170,22 @@ function convertQSODatabaseFormat(adifQSO, userId) { } /** - * Sync QSOs from DCL to database + * Yield to event loop to allow other requests to be processed + * This prevents blocking the server during long-running sync operations + */ +function yieldToEventLoop() { + return new Promise(resolve => setImmediate(resolve)); +} + +/** + * Get QSO key for duplicate detection + */ +function getQSOKey(qso) { + return `${qso.callsign}|${qso.qsoDate}|${qso.timeOn}|${qso.band}|${qso.mode}`; +} + +/** + * Sync QSOs from DCL to database (optimized with batch operations) * Updates existing QSOs with DCL confirmation data * * @param {number} userId - User ID @@ -219,181 +234,215 @@ export async function syncQSOs(userId, dclApiKey, sinceDate = null, jobId = null const addedQSOs = []; const updatedQSOs = []; - for (let i = 0; i < adifQSOs.length; i++) { - const adifQSO = adifQSOs[i]; + // Convert all QSOs to database format + const dbQSOs = adifQSOs.map(qso => convertQSODatabaseFormat(qso, userId)); - try { - const dbQSO = convertQSODatabaseFormat(adifQSO, userId); + // Batch size for processing + const BATCH_SIZE = 100; + const totalBatches = Math.ceil(dbQSOs.length / BATCH_SIZE); - // Check if QSO already exists (match by callsign, date, time, band, mode) - const existing = await db - .select() - .from(qsos) - .where( - and( - eq(qsos.userId, userId), - eq(qsos.callsign, dbQSO.callsign), - eq(qsos.qsoDate, dbQSO.qsoDate), - eq(qsos.timeOn, dbQSO.timeOn), - eq(qsos.band, dbQSO.band), - eq(qsos.mode, dbQSO.mode) - ) + for (let batchNum = 0; batchNum < totalBatches; batchNum++) { + const startIdx = batchNum * BATCH_SIZE; + const endIdx = Math.min(startIdx + BATCH_SIZE, dbQSOs.length); + const batch = dbQSOs.slice(startIdx, endIdx); + + // Get unique callsigns and dates from batch + const batchCallsigns = [...new Set(batch.map(q => q.callsign))]; + const batchDates = [...new Set(batch.map(q => q.qsoDate))]; + + // Fetch all existing QSOs that could match this batch in one query + const existingQSOs = await db + .select() + .from(qsos) + .where( + and( + eq(qsos.userId, userId), + // Match callsigns OR dates from this batch + sql`(${qsos.callsign} IN ${batchCallsigns} OR ${qsos.qsoDate} IN ${batchDates})` ) - .limit(1); + ); - if (existing.length > 0) { - const existingQSO = existing[0]; + // Build lookup map for existing QSOs + const existingMap = new Map(); + for (const existing of existingQSOs) { + const key = getQSOKey(existing); + existingMap.set(key, existing); + } - // Check if DCL confirmation or DOK data has changed - const dataChanged = - existingQSO.dclQslRstatus !== dbQSO.dclQslRstatus || - existingQSO.dclQslRdate !== dbQSO.dclQslRdate || - existingQSO.darcDok !== (dbQSO.darcDok || existingQSO.darcDok) || - existingQSO.myDarcDok !== (dbQSO.myDarcDok || existingQSO.myDarcDok) || - existingQSO.grid !== (dbQSO.grid || existingQSO.grid); + // Process batch + const toInsert = []; + const toUpdate = []; + const changeRecords = []; - if (dataChanged) { - // Record before state for rollback - const beforeData = JSON.stringify({ - dclQslRstatus: existingQSO.dclQslRstatus, - dclQslRdate: existingQSO.dclQslRdate, - darcDok: existingQSO.darcDok, - myDarcDok: existingQSO.myDarcDok, - grid: existingQSO.grid, - gridSource: existingQSO.gridSource, - entity: existingQSO.entity, - entityId: existingQSO.entityId, - }); + for (const dbQSO of batch) { + try { + const key = getQSOKey(dbQSO); + const existingQSO = existingMap.get(key); - // Update existing QSO with changed DCL confirmation and DOK data - const updateData = { - dclQslRdate: dbQSO.dclQslRdate, - dclQslRstatus: dbQSO.dclQslRstatus, - }; + if (existingQSO) { + // Check if DCL confirmation or DOK data has changed + const dataChanged = + existingQSO.dclQslRstatus !== dbQSO.dclQslRstatus || + existingQSO.dclQslRdate !== dbQSO.dclQslRdate || + existingQSO.darcDok !== (dbQSO.darcDok || existingQSO.darcDok) || + existingQSO.myDarcDok !== (dbQSO.myDarcDok || existingQSO.myDarcDok) || + existingQSO.grid !== (dbQSO.grid || existingQSO.grid); - // Only add DOK fields if DCL sent them - if (dbQSO.darcDok) updateData.darcDok = dbQSO.darcDok; - if (dbQSO.myDarcDok) updateData.myDarcDok = dbQSO.myDarcDok; + if (dataChanged) { + // Build update data + const updateData = { + dclQslRdate: dbQSO.dclQslRdate, + dclQslRstatus: dbQSO.dclQslRstatus, + }; - // Only update grid if DCL sent one - if (dbQSO.grid) { - updateData.grid = dbQSO.grid; - updateData.gridSource = dbQSO.gridSource; - } + // Only add DOK fields if DCL sent them + if (dbQSO.darcDok) updateData.darcDok = dbQSO.darcDok; + if (dbQSO.myDarcDok) updateData.myDarcDok = dbQSO.myDarcDok; - // DXCC priority: LoTW > DCL - // Only update entity fields from DCL if: - // 1. QSO is NOT LoTW confirmed, AND - // 2. DCL actually sent entity data, AND - // 3. Current entity is missing - const hasLoTWConfirmation = existingQSO.lotwQslRstatus === 'Y'; - const hasDCLData = dbQSO.entity || dbQSO.entityId; - const missingEntity = !existingQSO.entity || existingQSO.entity === ''; + // Only update grid if DCL sent one + if (dbQSO.grid) { + updateData.grid = dbQSO.grid; + updateData.gridSource = dbQSO.gridSource; + } - if (!hasLoTWConfirmation && hasDCLData && missingEntity) { - // Fill in entity data from DCL (only if DCL provides it) - if (dbQSO.entity) updateData.entity = dbQSO.entity; - if (dbQSO.entityId) updateData.entityId = dbQSO.entityId; - if (dbQSO.continent) updateData.continent = dbQSO.continent; - if (dbQSO.cqZone) updateData.cqZone = dbQSO.cqZone; - if (dbQSO.ituZone) updateData.ituZone = dbQSO.ituZone; - } + // DXCC priority: LoTW > DCL + // Only update entity fields from DCL if: + // 1. QSO is NOT LoTW confirmed, AND + // 2. DCL actually sent entity data, AND + // 3. Current entity is missing + const hasLoTWConfirmation = existingQSO.lotwQslRstatus === 'Y'; + const hasDCLData = dbQSO.entity || dbQSO.entityId; + const missingEntity = !existingQSO.entity || existingQSO.entity === ''; - await db - .update(qsos) - .set(updateData) - .where(eq(qsos.id, existingQSO.id)); + if (!hasLoTWConfirmation && hasDCLData && missingEntity) { + if (dbQSO.entity) updateData.entity = dbQSO.entity; + if (dbQSO.entityId) updateData.entityId = dbQSO.entityId; + if (dbQSO.continent) updateData.continent = dbQSO.continent; + if (dbQSO.cqZone) updateData.cqZone = dbQSO.cqZone; + if (dbQSO.ituZone) updateData.ituZone = dbQSO.ituZone; + } - // Record after state for rollback - const afterData = JSON.stringify({ - dclQslRstatus: dbQSO.dclQslRstatus, - dclQslRdate: dbQSO.dclQslRdate, - darcDok: updateData.darcDok, - myDarcDok: updateData.myDarcDok, - grid: updateData.grid, - gridSource: updateData.gridSource, - entity: updateData.entity, - entityId: updateData.entityId, - }); - - // Track change in qso_changes table if jobId provided - if (jobId) { - await db.insert(qsoChanges).values({ - jobId, - qsoId: existingQSO.id, - changeType: 'updated', - beforeData, - afterData, + toUpdate.push({ + id: existingQSO.id, + data: updateData, }); - } - updatedCount++; - // Track updated QSO (CALL and DATE) - updatedQSOs.push({ - id: existingQSO.id, + // Track change for rollback + if (jobId) { + changeRecords.push({ + jobId, + qsoId: existingQSO.id, + changeType: 'updated', + beforeData: JSON.stringify({ + dclQslRstatus: existingQSO.dclQslRstatus, + dclQslRdate: existingQSO.dclQslRdate, + darcDok: existingQSO.darcDok, + myDarcDok: existingQSO.myDarcDok, + grid: existingQSO.grid, + gridSource: existingQSO.gridSource, + entity: existingQSO.entity, + entityId: existingQSO.entityId, + }), + afterData: JSON.stringify({ + dclQslRstatus: dbQSO.dclQslRstatus, + dclQslRdate: dbQSO.dclQslRdate, + darcDok: updateData.darcDok, + myDarcDok: updateData.myDarcDok, + grid: updateData.grid, + gridSource: updateData.gridSource, + entity: updateData.entity, + entityId: updateData.entityId, + }), + }); + } + + updatedQSOs.push({ + id: existingQSO.id, + callsign: dbQSO.callsign, + date: dbQSO.qsoDate, + band: dbQSO.band, + mode: dbQSO.mode, + }); + updatedCount++; + } else { + skippedCount++; + } + } else { + // New QSO to insert + toInsert.push(dbQSO); + addedQSOs.push({ callsign: dbQSO.callsign, date: dbQSO.qsoDate, band: dbQSO.band, mode: dbQSO.mode, }); - } else { - // Skip - same data - skippedCount++; + addedCount++; } - } else { - // Insert new QSO - const [newQSO] = await db.insert(qsos).values(dbQSO).returning(); + } catch (error) { + logger.error('Failed to process DCL QSO in batch', { + error: error.message, + qso: dbQSO, + userId, + }); + errors.push({ qso: dbQSO, error: error.message }); + } + } - // Track change in qso_changes table if jobId provided - if (jobId) { - const afterData = JSON.stringify({ - callsign: dbQSO.callsign, - qsoDate: dbQSO.qsoDate, - timeOn: dbQSO.timeOn, - band: dbQSO.band, - mode: dbQSO.mode, - }); - - await db.insert(qsoChanges).values({ + // Batch insert new QSOs + if (toInsert.length > 0) { + const inserted = await db.insert(qsos).values(toInsert).returning(); + // Track inserted QSOs with their IDs for change tracking + if (jobId) { + for (let i = 0; i < inserted.length; i++) { + changeRecords.push({ jobId, - qsoId: newQSO.id, + qsoId: inserted[i].id, changeType: 'added', beforeData: null, - afterData, + afterData: JSON.stringify({ + callsign: toInsert[i].callsign, + qsoDate: toInsert[i].qsoDate, + timeOn: toInsert[i].timeOn, + band: toInsert[i].band, + mode: toInsert[i].mode, + }), }); + // Update addedQSOs with actual IDs + addedQSOs[addedCount - inserted.length + i].id = inserted[i].id; } - - addedCount++; - // Track added QSO (CALL and DATE) - addedQSOs.push({ - id: newQSO.id, - callsign: dbQSO.callsign, - date: dbQSO.qsoDate, - band: dbQSO.band, - mode: dbQSO.mode, - }); } - - // Update job progress every 10 QSOs - if (jobId && (i + 1) % 10 === 0) { - await updateJobProgress(jobId, { - processed: i + 1, - message: `Processed ${i + 1}/${adifQSOs.length} QSOs from DCL...`, - }); - } - } catch (error) { - logger.error('Failed to process DCL QSO', { - error: error.message, - qso: adifQSO, - userId, - }); - errors.push({ qso: adifQSO, error: error.message }); } + + // Batch update existing QSOs + if (toUpdate.length > 0) { + for (const update of toUpdate) { + await db + .update(qsos) + .set(update.data) + .where(eq(qsos.id, update.id)); + } + } + + // Batch insert change records + if (changeRecords.length > 0) { + await db.insert(qsoChanges).values(changeRecords); + } + + // Update job progress after each batch + if (jobId) { + await updateJobProgress(jobId, { + processed: endIdx, + message: `Processed ${endIdx}/${dbQSOs.length} QSOs from DCL...`, + }); + } + + // Yield to event loop after each batch to allow other requests + await yieldToEventLoop(); } const result = { success: true, - total: adifQSOs.length, + total: dbQSOs.length, added: addedCount, updated: updatedCount, skipped: skippedCount, diff --git a/src/backend/services/lotw.service.js b/src/backend/services/lotw.service.js index c61383d..bf4fc54 100644 --- a/src/backend/services/lotw.service.js +++ b/src/backend/services/lotw.service.js @@ -211,7 +211,22 @@ function convertQSODatabaseFormat(adifQSO, userId) { } /** - * Sync QSOs from LoTW to database + * Yield to event loop to allow other requests to be processed + * This prevents blocking the server during long-running sync operations + */ +function yieldToEventLoop() { + return new Promise(resolve => setImmediate(resolve)); +} + +/** + * Get QSO key for duplicate detection + */ +function getQSOKey(qso) { + return `${qso.callsign}|${qso.qsoDate}|${qso.timeOn}|${qso.band}|${qso.mode}`; +} + +/** + * Sync QSOs from LoTW to database (optimized with batch operations) * @param {number} userId - User ID * @param {string} lotwUsername - LoTW username * @param {string} lotwPassword - LoTW password @@ -258,129 +273,168 @@ export async function syncQSOs(userId, lotwUsername, lotwPassword, sinceDate = n const addedQSOs = []; const updatedQSOs = []; - for (let i = 0; i < adifQSOs.length; i++) { - const qsoData = adifQSOs[i]; + // Convert all QSOs to database format + const dbQSOs = adifQSOs.map(qsoData => convertQSODatabaseFormat(qsoData, userId)); - try { - const dbQSO = convertQSODatabaseFormat(qsoData, userId); + // Batch size for processing + const BATCH_SIZE = 100; + const totalBatches = Math.ceil(dbQSOs.length / BATCH_SIZE); - const existing = await db - .select() - .from(qsos) - .where( - and( - eq(qsos.userId, userId), - eq(qsos.callsign, dbQSO.callsign), - eq(qsos.qsoDate, dbQSO.qsoDate), - eq(qsos.timeOn, dbQSO.timeOn), - eq(qsos.band, dbQSO.band), - eq(qsos.mode, dbQSO.mode) - ) + for (let batchNum = 0; batchNum < totalBatches; batchNum++) { + const startIdx = batchNum * BATCH_SIZE; + const endIdx = Math.min(startIdx + BATCH_SIZE, dbQSOs.length); + const batch = dbQSOs.slice(startIdx, endIdx); + + // Build condition for batch duplicate check + // Get unique callsigns, dates, bands, modes from batch + const batchCallsigns = [...new Set(batch.map(q => q.callsign))]; + const batchDates = [...new Set(batch.map(q => q.qsoDate))]; + + // Fetch all existing QSOs that could match this batch in one query + const existingQSOs = await db + .select() + .from(qsos) + .where( + and( + eq(qsos.userId, userId), + // Match callsigns OR dates from this batch + sql`(${qsos.callsign} IN ${batchCallsigns} OR ${qsos.qsoDate} IN ${batchDates})` ) - .limit(1); + ); - if (existing.length > 0) { - const existingQSO = existing[0]; + // Build lookup map for existing QSOs + const existingMap = new Map(); + for (const existing of existingQSOs) { + const key = getQSOKey(existing); + existingMap.set(key, existing); + } - // Check if LoTW confirmation data has changed - const confirmationChanged = - existingQSO.lotwQslRstatus !== dbQSO.lotwQslRstatus || - existingQSO.lotwQslRdate !== dbQSO.lotwQslRdate; + // Process batch + const toInsert = []; + const toUpdate = []; + const changeRecords = []; - if (confirmationChanged) { - // Record before state for rollback - const beforeData = JSON.stringify({ - lotwQslRstatus: existingQSO.lotwQslRstatus, - lotwQslRdate: existingQSO.lotwQslRdate, - }); + for (const dbQSO of batch) { + try { + const key = getQSOKey(dbQSO); + const existingQSO = existingMap.get(key); - await db - .update(qsos) - .set({ + if (existingQSO) { + // Check if LoTW confirmation data has changed + const confirmationChanged = + existingQSO.lotwQslRstatus !== dbQSO.lotwQslRstatus || + existingQSO.lotwQslRdate !== dbQSO.lotwQslRdate; + + if (confirmationChanged) { + toUpdate.push({ + id: existingQSO.id, lotwQslRdate: dbQSO.lotwQslRdate, lotwQslRstatus: dbQSO.lotwQslRstatus, lotwSyncedAt: dbQSO.lotwSyncedAt, - }) - .where(eq(qsos.id, existingQSO.id)); - - // Record after state for rollback - const afterData = JSON.stringify({ - lotwQslRstatus: dbQSO.lotwQslRstatus, - lotwQslRdate: dbQSO.lotwQslRdate, - }); - - // Track change in qso_changes table if jobId provided - if (jobId) { - await db.insert(qsoChanges).values({ - jobId, - qsoId: existingQSO.id, - changeType: 'updated', - beforeData, - afterData, }); - } - updatedCount++; - // Track updated QSO (CALL and DATE) - updatedQSOs.push({ - id: existingQSO.id, + // Track change for rollback + if (jobId) { + changeRecords.push({ + jobId, + qsoId: existingQSO.id, + changeType: 'updated', + beforeData: JSON.stringify({ + lotwQslRstatus: existingQSO.lotwQslRstatus, + lotwQslRdate: existingQSO.lotwQslRdate, + }), + afterData: JSON.stringify({ + lotwQslRstatus: dbQSO.lotwQslRstatus, + lotwQslRdate: dbQSO.lotwQslRdate, + }), + }); + } + + updatedQSOs.push({ + id: existingQSO.id, + callsign: dbQSO.callsign, + date: dbQSO.qsoDate, + band: dbQSO.band, + mode: dbQSO.mode, + }); + updatedCount++; + } else { + skippedCount++; + } + } else { + // New QSO to insert + toInsert.push(dbQSO); + addedQSOs.push({ callsign: dbQSO.callsign, date: dbQSO.qsoDate, band: dbQSO.band, mode: dbQSO.mode, }); - } else { - // Skip - same data - skippedCount++; + addedCount++; } - } else { - // Insert new QSO - const [newQSO] = await db.insert(qsos).values(dbQSO).returning(); + } catch (error) { + logger.error('Error processing QSO in batch', { error: error.message, jobId, qso: dbQSO }); + errors.push({ qso: dbQSO, error: error.message }); + } + } - // Track change in qso_changes table if jobId provided - if (jobId) { - const afterData = JSON.stringify({ - callsign: dbQSO.callsign, - qsoDate: dbQSO.qsoDate, - timeOn: dbQSO.timeOn, - band: dbQSO.band, - mode: dbQSO.mode, - }); - - await db.insert(qsoChanges).values({ + // Batch insert new QSOs + if (toInsert.length > 0) { + const inserted = await db.insert(qsos).values(toInsert).returning(); + // Track inserted QSOs with their IDs for change tracking + if (jobId) { + for (let i = 0; i < inserted.length; i++) { + changeRecords.push({ jobId, - qsoId: newQSO.id, + qsoId: inserted[i].id, changeType: 'added', beforeData: null, - afterData, + afterData: JSON.stringify({ + callsign: toInsert[i].callsign, + qsoDate: toInsert[i].qsoDate, + timeOn: toInsert[i].timeOn, + band: toInsert[i].band, + mode: toInsert[i].mode, + }), }); + // Update addedQSOs with actual IDs + addedQSOs[addedCount - inserted.length + i].id = inserted[i].id; } - - addedCount++; - // Track added QSO (CALL and DATE) - addedQSOs.push({ - id: newQSO.id, - callsign: dbQSO.callsign, - date: dbQSO.qsoDate, - band: dbQSO.band, - mode: dbQSO.mode, - }); } - - // Update job progress every 10 QSOs - if (jobId && (i + 1) % 10 === 0) { - await updateJobProgress(jobId, { - processed: i + 1, - message: `Processed ${i + 1}/${adifQSOs.length} QSOs...`, - }); - } - } catch (error) { - logger.error('Error processing QSO', { error: error.message, jobId, qso: qsoData }); - errors.push({ qso: qsoData, error: error.message }); } + + // Batch update existing QSOs + if (toUpdate.length > 0) { + for (const update of toUpdate) { + await db + .update(qsos) + .set({ + lotwQslRdate: update.lotwQslRdate, + lotwQslRstatus: update.lotwQslRstatus, + lotwSyncedAt: update.lotwSyncedAt, + }) + .where(eq(qsos.id, update.id)); + } + } + + // Batch insert change records + if (changeRecords.length > 0) { + await db.insert(qsoChanges).values(changeRecords); + } + + // Update job progress after each batch + if (jobId) { + await updateJobProgress(jobId, { + processed: endIdx, + message: `Processed ${endIdx}/${dbQSOs.length} QSOs...`, + }); + } + + // Yield to event loop after each batch to allow other requests + await yieldToEventLoop(); } - logger.info('LoTW sync completed', { total: adifQSOs.length, added: addedCount, updated: updatedCount, skipped: skippedCount, jobId }); + logger.info('LoTW sync completed', { total: dbQSOs.length, added: addedCount, updated: updatedCount, skipped: skippedCount, jobId }); // Invalidate award and stats cache for this user since QSOs may have changed const deletedCache = invalidateUserCache(userId); @@ -389,7 +443,7 @@ export async function syncQSOs(userId, lotwUsername, lotwPassword, sinceDate = n return { success: true, - total: adifQSOs.length, + total: dbQSOs.length, added: addedCount, updated: updatedCount, skipped: skippedCount,