import { db, logger } from '../config.js'; import { qsos, qsoChanges, syncJobs, awardProgress } from '../db/schema/index.js'; import { max, sql, eq, and, or, desc, like } from 'drizzle-orm'; import { updateJobProgress } from './job-queue.service.js'; import { parseADIF, normalizeBand, normalizeMode } from '../utils/adif-parser.js'; import { invalidateUserCache, getCachedStats, setCachedStats, invalidateStatsCache } from './cache.service.js'; import { trackQueryPerformance } from './performance.service.js'; import { yieldToEventLoop, getQSOKey } from '../utils/sync-helpers.js'; /** * LoTW (Logbook of the World) Service * Fetches QSOs from ARRL's LoTW system */ // Simplified polling configuration const MAX_RETRIES = 30; const RETRY_DELAY = 10000; const REQUEST_TIMEOUT = 60000; /** * SECURITY: Sanitize search input to prevent injection and DoS * Limits length and removes potentially harmful characters */ function sanitizeSearchInput(searchTerm) { if (!searchTerm || typeof searchTerm !== 'string') { return ''; } // Trim whitespace let sanitized = searchTerm.trim(); // Limit length (DoS prevention) const MAX_SEARCH_LENGTH = 100; if (sanitized.length > MAX_SEARCH_LENGTH) { sanitized = sanitized.substring(0, MAX_SEARCH_LENGTH); } // Remove potentially dangerous SQL pattern wildcards from user input // We'll add our own wildcards for the LIKE query // Note: Drizzle ORM escapes parameters, but this adds defense-in-depth sanitized = sanitized.replace(/[%_\\]/g, ''); // Remove null bytes and other control characters sanitized = sanitized.replace(/[\x00-\x1F\x7F]/g, ''); return sanitized; } /** * Check if LoTW response indicates the report is still being prepared */ function isReportPending(responseData) { const trimmed = responseData.trim().toLowerCase(); if (trimmed.length < 100) return true; if (trimmed.includes('') || trimmed.includes('')) return true; const pendingMessages = [ 'report is being prepared', 'your report is being generated', 'please try again', 'report queue', 'not yet available', 'temporarily unavailable', ]; for (const msg of pendingMessages) { if (trimmed.includes(msg)) return true; } const hasAdifHeader = trimmed.includes(' new Promise(resolve => setTimeout(resolve, ms)); /** * Fetch QSOs from LoTW with retry support */ async function fetchQSOsFromLoTW(lotwUsername, lotwPassword, sinceDate = null) { const startTime = Date.now(); const url = 'https://lotw.arrl.org/lotwuser/lotwreport.adi'; const params = new URLSearchParams({ login: lotwUsername, password: lotwPassword, qso_query: '1', qso_qsl: 'yes', qso_qsldetail: 'yes', qso_mydetail: 'yes', qso_withown: 'yes', }); if (sinceDate) { const dateStr = sinceDate.toISOString().split('T')[0]; params.append('qso_qslsince', dateStr); logger.debug('Incremental sync since', { date: dateStr }); } else { logger.debug('Full sync - fetching all QSOs'); } const fullUrl = `${url}?${params.toString()}`; logger.debug('Fetching from LoTW', { url: fullUrl.replace(/password=[^&]+/, 'password=***') }); for (let attempt = 0; attempt < MAX_RETRIES; attempt++) { if (attempt > 0) { logger.debug(`Retry attempt ${attempt + 1}/${MAX_RETRIES}`); } try { const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), REQUEST_TIMEOUT); const response = await fetch(fullUrl, { signal: controller.signal }); clearTimeout(timeoutId); if (!response.ok) { if (response.status === 503) { logger.warn('LoTW returned 503, retrying...'); await sleep(RETRY_DELAY); continue; } else if (response.status === 401) { return { error: 'Invalid LoTW credentials. Please check your username and password in Settings.' }; } else if (response.status === 404) { return { error: 'LoTW service not found (404). The LoTW API URL may have changed.' }; } else { logger.warn(`LoTW returned ${response.status}, retrying...`); await sleep(RETRY_DELAY); continue; } } const adifData = await response.text(); if (adifData.toLowerCase().includes('username/password incorrect')) { return { error: 'Username/password incorrect' }; } const header = adifData.trim().substring(0, 39).toLowerCase(); if (!header.includes('arrl logbook of the world')) { if (isReportPending(adifData)) { logger.debug('LoTW report still being prepared, waiting...'); await sleep(RETRY_DELAY); continue; } return { error: 'Downloaded LoTW report is invalid. Check your credentials.' }; } logger.info('LoTW report downloaded successfully', { size: adifData.length }); const qsos = parseADIF(adifData); logger.info('Parsed QSOs from LoTW', { count: qsos.length }); return qsos; } catch (error) { if (error.name === 'AbortError') { logger.debug('Request timeout, retrying...'); await sleep(RETRY_DELAY); continue; } if (error.message.includes('credentials') || error.message.includes('401') || error.message.includes('404')) { throw error; } if (attempt < MAX_RETRIES - 1) { logger.warn(`Error on attempt ${attempt + 1}`, { error: error.message }); await sleep(RETRY_DELAY); continue; } else { throw error; } } } const totalTime = Math.round((Date.now() - startTime) / 1000); return { error: `LoTW sync failed: Report not ready after ${MAX_RETRIES} attempts (${totalTime}s). LoTW may be experiencing high load. Please try again later.` }; } /** * Convert ADIF QSO to database format */ function convertQSODatabaseFormat(adifQSO, userId) { return { userId, callsign: adifQSO.call || '', qsoDate: adifQSO.qso_date || '', timeOn: adifQSO.time_on || adifQSO.time_off || '000000', band: normalizeBand(adifQSO.band), mode: normalizeMode(adifQSO.mode), freq: adifQSO.freq ? parseInt(adifQSO.freq) : null, freqRx: adifQSO.freq_rx ? parseInt(adifQSO.freq_rx) : null, entity: adifQSO.country || adifQSO.dxcc_country || '', entityId: adifQSO.dxcc || null, grid: adifQSO.gridsquare || '', continent: adifQSO.continent || '', cqZone: adifQSO.cq_zone ? parseInt(adifQSO.cq_zone) : null, ituZone: adifQSO.itu_zone ? parseInt(adifQSO.itu_zone) : null, state: adifQSO.state || adifQSO.us_state || '', satName: adifQSO.sat_name || '', satMode: adifQSO.sat_mode || '', lotwQslRdate: adifQSO.qslrdate || '', lotwQslRstatus: adifQSO.qsl_rcvd || 'N', lotwSyncedAt: new Date(), }; } /** * Sync QSOs from LoTW to database (optimized with batch operations) * @param {number} userId - User ID * @param {string} lotwUsername - LoTW username * @param {string} lotwPassword - LoTW password * @param {Date|null} sinceDate - Optional date for incremental sync * @param {number|null} jobId - Optional job ID for progress tracking */ export async function syncQSOs(userId, lotwUsername, lotwPassword, sinceDate = null, jobId = null) { if (jobId) { await updateJobProgress(jobId, { message: 'Fetching QSOs from LoTW...', step: 'fetch', }); } const adifQSOs = await fetchQSOsFromLoTW(lotwUsername, lotwPassword, sinceDate); // Check for error response from LoTW fetch if (!adifQSOs) { return { success: false, error: 'Failed to fetch from LoTW', total: 0, added: 0, updated: 0 }; } // If adifQSOs is an error object, throw it if (adifQSOs.error) { throw new Error(adifQSOs.error); } if (adifQSOs.length === 0) { return { success: true, total: 0, added: 0, updated: 0, message: 'No QSOs found in LoTW' }; } if (jobId) { await updateJobProgress(jobId, { message: `Processing ${adifQSOs.length} QSOs...`, step: 'process', total: adifQSOs.length, processed: 0, }); } let addedCount = 0; let updatedCount = 0; let skippedCount = 0; const errors = []; const addedQSOs = []; const updatedQSOs = []; // Convert all QSOs to database format const dbQSOs = adifQSOs.map(qsoData => convertQSODatabaseFormat(qsoData, userId)); // Batch size for processing const BATCH_SIZE = 100; const totalBatches = Math.ceil(dbQSOs.length / BATCH_SIZE); for (let batchNum = 0; batchNum < totalBatches; batchNum++) { const startIdx = batchNum * BATCH_SIZE; const endIdx = Math.min(startIdx + BATCH_SIZE, dbQSOs.length); const batch = dbQSOs.slice(startIdx, endIdx); // Build condition for batch duplicate check // Get unique callsigns, dates, bands, modes from batch const batchCallsigns = [...new Set(batch.map(q => q.callsign))]; const batchDates = [...new Set(batch.map(q => q.qsoDate))]; // Fetch all existing QSOs that could match this batch in one query const existingQSOs = await db .select() .from(qsos) .where( and( eq(qsos.userId, userId), // Match callsigns OR dates from this batch sql`(${qsos.callsign} IN ${batchCallsigns} OR ${qsos.qsoDate} IN ${batchDates})` ) ); // Build lookup map for existing QSOs const existingMap = new Map(); for (const existing of existingQSOs) { const key = getQSOKey(existing); existingMap.set(key, existing); } // Process batch const toInsert = []; const toUpdate = []; const changeRecords = []; for (const dbQSO of batch) { try { const key = getQSOKey(dbQSO); const existingQSO = existingMap.get(key); if (existingQSO) { // Check if LoTW confirmation data has changed const confirmationChanged = existingQSO.lotwQslRstatus !== dbQSO.lotwQslRstatus || existingQSO.lotwQslRdate !== dbQSO.lotwQslRdate; if (confirmationChanged) { toUpdate.push({ id: existingQSO.id, lotwQslRdate: dbQSO.lotwQslRdate, lotwQslRstatus: dbQSO.lotwQslRstatus, lotwSyncedAt: dbQSO.lotwSyncedAt, }); // Track change for rollback if (jobId) { changeRecords.push({ jobId, qsoId: existingQSO.id, changeType: 'updated', beforeData: JSON.stringify({ lotwQslRstatus: existingQSO.lotwQslRstatus, lotwQslRdate: existingQSO.lotwQslRdate, }), afterData: JSON.stringify({ lotwQslRstatus: dbQSO.lotwQslRstatus, lotwQslRdate: dbQSO.lotwQslRdate, }), }); } updatedQSOs.push({ id: existingQSO.id, callsign: dbQSO.callsign, date: dbQSO.qsoDate, band: dbQSO.band, mode: dbQSO.mode, }); updatedCount++; } else { skippedCount++; } } else { // New QSO to insert toInsert.push(dbQSO); addedQSOs.push({ callsign: dbQSO.callsign, date: dbQSO.qsoDate, band: dbQSO.band, mode: dbQSO.mode, }); addedCount++; } } catch (error) { logger.error('Error processing QSO in batch', { error: error.message, jobId, qso: dbQSO }); errors.push({ qso: dbQSO, error: error.message }); } } // Batch insert new QSOs if (toInsert.length > 0) { const inserted = await db.insert(qsos).values(toInsert).returning(); // Track inserted QSOs with their IDs for change tracking if (jobId) { for (let i = 0; i < inserted.length; i++) { changeRecords.push({ jobId, qsoId: inserted[i].id, changeType: 'added', beforeData: null, afterData: JSON.stringify({ callsign: toInsert[i].callsign, qsoDate: toInsert[i].qsoDate, timeOn: toInsert[i].timeOn, band: toInsert[i].band, mode: toInsert[i].mode, }), }); // Update addedQSOs with actual IDs addedQSOs[addedCount - inserted.length + i].id = inserted[i].id; } } } // Batch update existing QSOs if (toUpdate.length > 0) { for (const update of toUpdate) { await db .update(qsos) .set({ lotwQslRdate: update.lotwQslRdate, lotwQslRstatus: update.lotwQslRstatus, lotwSyncedAt: update.lotwSyncedAt, }) .where(eq(qsos.id, update.id)); } } // Batch insert change records if (changeRecords.length > 0) { await db.insert(qsoChanges).values(changeRecords); } // Update job progress after each batch if (jobId) { await updateJobProgress(jobId, { processed: endIdx, message: `Processed ${endIdx}/${dbQSOs.length} QSOs...`, }); } // Yield to event loop after each batch to allow other requests await yieldToEventLoop(); } logger.info('LoTW sync completed', { total: dbQSOs.length, added: addedCount, updated: updatedCount, skipped: skippedCount, jobId }); // Invalidate award and stats cache for this user since QSOs may have changed const deletedCache = invalidateUserCache(userId); invalidateStatsCache(userId); logger.debug(`Invalidated ${deletedCache} cached award entries and stats cache for user ${userId}`); return { success: true, total: dbQSOs.length, added: addedCount, updated: updatedCount, skipped: skippedCount, addedQSOs, updatedQSOs, errors: errors.length > 0 ? errors : undefined, }; } /** * Get QSOs for a user with pagination */ export async function getUserQSOs(userId, filters = {}, options = {}) { const { page = 1, limit = 100 } = options; logger.debug('getUserQSOs called', { userId, filters, options }); const conditions = [eq(qsos.userId, userId)]; if (filters.band) conditions.push(eq(qsos.band, filters.band)); if (filters.mode) conditions.push(eq(qsos.mode, filters.mode)); if (filters.confirmed) conditions.push(eq(qsos.lotwQslRstatus, 'Y')); // Confirmation type filter: lotw, dcl, both, none if (filters.confirmationType) { logger.debug('Applying confirmation type filter', { confirmationType: filters.confirmationType }); if (filters.confirmationType === 'lotw') { // LoTW only: Confirmed by LoTW but NOT by DCL conditions.push(eq(qsos.lotwQslRstatus, 'Y')); conditions.push( sql`(${qsos.dclQslRstatus} IS NULL OR ${qsos.dclQslRstatus} != 'Y')` ); } else if (filters.confirmationType === 'dcl') { // DCL only: Confirmed by DCL but NOT by LoTW conditions.push(eq(qsos.dclQslRstatus, 'Y')); conditions.push( sql`(${qsos.lotwQslRstatus} IS NULL OR ${qsos.lotwQslRstatus} != 'Y')` ); } else if (filters.confirmationType === 'both') { // Both confirmed: Confirmed by LoTW AND DCL conditions.push(eq(qsos.lotwQslRstatus, 'Y')); conditions.push(eq(qsos.dclQslRstatus, 'Y')); } else if (filters.confirmationType === 'any') { // Confirmed by at least 1 service: LoTW OR DCL conditions.push( sql`(${qsos.lotwQslRstatus} = 'Y' OR ${qsos.dclQslRstatus} = 'Y')` ); } else if (filters.confirmationType === 'none') { // Not confirmed: Not confirmed by LoTW AND not confirmed by DCL conditions.push( sql`(${qsos.lotwQslRstatus} IS NULL OR ${qsos.lotwQslRstatus} != 'Y')` ); conditions.push( sql`(${qsos.dclQslRstatus} IS NULL OR ${qsos.dclQslRstatus} != 'Y')` ); } } // Search filter: callsign, entity, or grid if (filters.search) { // SECURITY: Sanitize search input to prevent injection const sanitized = sanitizeSearchInput(filters.search); if (sanitized) { const searchTerm = `%${sanitized}%`; conditions.push(or( like(qsos.callsign, searchTerm), like(qsos.entity, searchTerm), like(qsos.grid, searchTerm) )); } } // Use SQL COUNT for efficient pagination (avoids loading all QSOs into memory) const [{ count }] = await db .select({ count: sql`CAST(count(*) AS INTEGER)` }) .from(qsos) .where(and(...conditions)); const totalCount = count; const offset = (page - 1) * limit; const results = await db .select() .from(qsos) .where(and(...conditions)) .orderBy(desc(qsos.qsoDate), desc(qsos.timeOn)) .limit(limit) .offset(offset); return { qsos: results, pagination: { page, limit, totalCount, totalPages: Math.ceil(totalCount / limit), hasNext: page * limit < totalCount, hasPrev: page > 1, }, }; } /** * Get QSO statistics for a user */ export async function getQSOStats(userId) { // Check cache first const cached = getCachedStats(userId); if (cached) { return cached; } // Calculate stats from database with performance tracking const stats = await trackQueryPerformance('getQSOStats', async () => { const [basicStats, uniqueStats] = await Promise.all([ db.select({ total: sql`CAST(COUNT(*) AS INTEGER)`, confirmed: sql`CAST(SUM(CASE WHEN lotw_qsl_rstatus = 'Y' OR dcl_qsl_rstatus = 'Y' THEN 1 ELSE 0 END) AS INTEGER)` }).from(qsos).where(eq(qsos.userId, userId)), db.select({ uniqueEntities: sql`CAST(COUNT(DISTINCT entity) AS INTEGER)`, uniqueBands: sql`CAST(COUNT(DISTINCT band) AS INTEGER)`, uniqueModes: sql`CAST(COUNT(DISTINCT mode) AS INTEGER)` }).from(qsos).where(eq(qsos.userId, userId)) ]); return { total: basicStats[0].total, confirmed: basicStats[0].confirmed || 0, uniqueEntities: uniqueStats[0].uniqueEntities || 0, uniqueBands: uniqueStats[0].uniqueBands || 0, uniqueModes: uniqueStats[0].uniqueModes || 0, }; }); // Cache results setCachedStats(userId, stats); return stats; } /** * Get the date of the last LoTW QSL for a user */ export async function getLastLoTWQSLDate(userId) { const [result] = await db .select({ maxDate: max(qsos.lotwQslRdate) }) .from(qsos) .where(eq(qsos.userId, userId)); if (!result || !result.maxDate) return null; const dateStr = result.maxDate; if (!dateStr || dateStr === '') return null; const year = dateStr.substring(0, 4); const month = dateStr.substring(4, 6); const day = dateStr.substring(6, 8); return new Date(`${year}-${month}-${day}`); } /** * Delete all QSOs for a user * Also deletes related qso_changes records to satisfy foreign key constraints */ export async function deleteQSOs(userId) { logger.debug('Deleting all QSOs for user', { userId }); // Step 1: Delete qso_changes that reference QSOs for this user // Need to use a subquery since qso_changes doesn't have userId directly const qsoIdsResult = await db .select({ id: qsos.id }) .from(qsos) .where(eq(qsos.userId, userId)); const qsoIds = qsoIdsResult.map(r => r.id); let deletedChanges = 0; if (qsoIds.length > 0) { // Delete qso_changes where qsoId is in the list of QSO IDs const changesResult = await db .delete(qsoChanges) .where(sql`${qsoChanges.qsoId} IN ${sql.raw(`(${qsoIds.join(',')})`)}`); deletedChanges = changesResult.changes || changesResult || 0; logger.debug('Deleted qso_changes', { count: deletedChanges }); } // Step 2: Delete the QSOs const result = await db.delete(qsos).where(eq(qsos.userId, userId)); logger.debug('Delete result', { result, type: typeof result, keys: Object.keys(result || {}) }); // Drizzle with SQLite/bun:sqlite returns various formats depending on driver let count = 0; if (result) { if (typeof result === 'number') { count = result; } else if (result.changes !== undefined) { count = result.changes; } else if (result.rows !== undefined) { count = result.rows; } else if (result.meta?.changes !== undefined) { count = result.meta.changes; } else if (result.meta?.rows !== undefined) { count = result.meta.rows; } } logger.info('Deleted QSOs', { userId, count, deletedChanges }); // Invalidate caches for this user await invalidateStatsCache(userId); await invalidateUserCache(userId); return count; } /** * Get a single QSO by ID for a specific user * @param {number} userId - User ID * @param {number} qsoId - QSO ID * @returns {Object|null} QSO object or null if not found */ export async function getQSOById(userId, qsoId) { const result = await db .select() .from(qsos) .where(and(eq(qsos.userId, userId), eq(qsos.id, qsoId))); return result.length > 0 ? result[0] : null; }