Wie kann ich sicherstellen, dass ein Job in Bull nicht zweimal ausgeführt wird?

11

Ich habe zwei Funktionen scheduleScan()und scan().

scan()Anrufe , scheduleScan() wenn es nichts anderes außer der Planung eines neuen Scan zu tun , so scheduleScan()kann ein Zeitplan scan(). Es gibt jedoch ein Problem: Einige Jobs werden zweimal ausgeführt.

Ich möchte sicherstellen, dass jeweils nur ein Auftrag bearbeitet wird. Wie kann ich das erreichen? Ich glaube, es hat etwas damit zu tun done()(es wurde in scan () entfernt), aber ich konnte keine Lösung finden.

Bull Version: 3.12.1

Wichtige späte Bearbeitung: scan() Ruft andere Funktionen auf und sie können andere Funktionen aufrufen oder nicht, aber sie sind alle Synchronisierungsfunktionen. Daher rufen sie eine Funktion nur auf, wenn ihre eigenen Jobs abgeschlossen sind. Es gibt nur einen Weg vorwärts. Am Ende des "Baums" nenne ich es, die letzte Funktion ruft ScheduleScan () auf, aber es können nicht zwei Jobs gleichzeitig ausgeführt werden. Jeder einzelne Job beginnt scan()übrigens bei und endet mitscheduleScan(stock, period, milliseconds, 'called by file.js')

export function update(job) {
  // does some calculations, then it may call scheduleScan() or
  // it may call another function, and that could be the one calling
  // scheduleScan() function.
  // For instance, a function like finalize()
}

export function scan(job) {
  update(job)
}


import moment from 'moment'
import stringHash from 'string-hash'
const opts = { redis: { port: 6379, host: '127.0.0.1', password: mypassword' } }
let queue = new Queue('scan', opts)

queue.process(1, (job) => {
  job.progress(100).then(() => {
    scan(job)
  })
})

export function scheduleScan (stock, period, milliseconds, triggeredBy) {
  let uniqueId = stringHash(stock + ':' + period)

  queue.getJob(uniqueId).then(job => {
    if (!job) {
      if (milliseconds) {
        queue.add({ stock, period, triggeredBy }, { delay: milliseconds, jobId: uniqueId }).then(() => {
          // console.log('Added with ms: ' + stock + ' ' + period)
        }).catch(err => {
          if (err) {
            console.log('Can not add because it exists ' + new Date())
          }
        })
      } else {
        queue.add({ stock, period, triggeredBy }, { jobId: uniqueId }).then(() => {
          // console.log('Added without ms: ' + stock + ' ' + period)
        }).catch(err => {
          if (err) {
            console.log('Can not add because it exists ' + new Date())
          }
        })
      }
    } else {
      job.getState().then(state => {
        if (state === 'completed') {
          job.remove().then(() => {
            if (milliseconds) {
              queue.add({ stock, period, triggeredBy }, { delay: milliseconds, jobId: uniqueId }).then(() => {
                // console.log('Added with ms: ' + stock + ' ' + period)
              }).catch(err => {
                if (err) {
                  console.log('Can not add because it exists ' + new Date())
                }
              })
            } else {
              queue.add({ stock, period, triggeredBy }, { jobId: uniqueId }).then(() => {
                // console.log('Added without ms: ' + stock + ' ' + period)
              }).catch(err => {
                if (err) {
                  console.log('Can not add because it exists ' + new Date())
                }
              })
            }
          }).catch(err => {
            if (err) {
              // console.log(err)
            }
          })
        }
      }).catch(err => {
        // console.log(err)
      })
    }
  })
}
salep
quelle
Ich kann keine scanFunktion finden. Können Sie mir helfen?
Muhammad Zeeshan
@ MuhammadZeeshan Ich habe es hinzugefügt, mein Fehler.
Salep

Antworten:

6

Ich glaube, das Problem ist, dass Ihre scanFunktion asynchron ist. Ihre job.progressFunktion ruft also auf scanund ruft dann sofort auf, donesodass die Warteschlange einen anderen Job verarbeiten kann.

Eine Lösung könnte darin bestehen, den doneRückruf als Parameter an Ihre scanund scheduleScanFunktionen zu übergeben und ihn aufzurufen, sobald Sie Ihren Job abgeschlossen haben (oder wenn ein Fehler aufgetreten ist).

Eine andere (bessere) Lösung könnte darin bestehen, sicherzustellen, dass Sie immer ein Promisevon zurückgeben scanund scheduleScandann auf das Versprechen warten, es zu lösen, und dann anrufen done. Stellen Sie in diesem Fall sicher, dass Sie alle Ihre Versprechen in Ihrer scheduleScanFunktion verketten .

queue.process(1, (job, done) => {
  job.progress(100).then(() => {
    scan(job)
        .then(done)
        .catch(done)
  })
})

export function scan() {
   // business logic
   return scheduleScan()
}

// Chain all of your promise returns. Otherwise
// the scan function will return sooner and allow done to be called
// prior to the scheduleScan function finishing it's execution
export function scheduleScan() {
    return queue.getJob(..).then(() => {
        ....
        return queue.add()...
        ....
        return queue.add(...)
            .catch(e => {
                 console.log(e);
                 // propogate errors!
                 throw e;
             })

}
jeeves
quelle
Ich habe meine Frage bearbeitet. Können Sie sie bitte noch einmal überprüfen, insbesondere den Teil "Wichtige späte Bearbeitung"? Gilt Ihre Antwort in dieser Situation noch? Vielen Dank.
Salep
1
Ja, es ist noch gültig. Aus Ihrer Bearbeitung, denke ich, scheduledScanwird immer nach allen anderen Synchronisierungsfunktionen in aufgerufen scan. Wenn dies der Fall ist, ist meine Antwort immer noch gültig. scheduleScanscan
Geben Sie
Wieder mein Fehler. Die erste Funktion, update (), befindet sich im Scan, aber update () kann eine andere Funktion wie finalize () aufrufen, und finalize () kann ScheduleScan () aufrufen. Bitte beachten Sie, dass diese in einer Reihenfolge erfolgen, sodass keine Mehrfachanrufe erfolgen. Ich mache dies, um meine App modular zu halten. - Danke
Salep
1
Ja, die gleiche Antwort. Wenn updateAnrufe scheduledScanoder eine beliebige Anzahl von Funktionen zwischen ihnen. Der entscheidende Punkt ist, dass Sie die Versprechen-Kette vom scheduleScangesamten Weg zurück zur scanFunktion zurückgeben müssen. Wenn also scanAufrufe updatewelche Aufrufe finalise..... welche Aufrufe scheduleScandie Versprechenskette aufrufen, müssen alle Funktionsaufrufe zurückgegeben werden, dh stellen Sie einfach sicher, dass Sie das Versprechen von jeder dieser Funktionen zurückgeben.
Jeeves
Also nur um meinen letzten Kommentar zu verdeutlichen. Wenn Sie beispielsweise innerhalb des Scans das Update aufrufen. Sie müssen das Ergebnis der Aktualisierung (ein Versprechen) von der Scanfunktion zurückgeben.
Jeeves
4

Die Scanfunktion ist eine asynchrone Funktion. In Ihrer queue.process()Funktion müssen Sie auf die Scanfunktion warten und dann den done()Rückruf aufrufen .

export async function scan(job) {
  // it does some calculations, then it creates a new schedule.
  return scheduleScan(stock, period, milliseconds, "scan.js");
}

queue.process(1, (job, done) => {
  job.progress(100).then(async() => {
    await scan(job);
    done();
  });
});

export async function scheduleScan(stock, period, milliseconds, triggeredBy) {
    let uniqueId = stringHash(stock + ":" + period);
    try {
      const existingJob = await queue.getJob(uniqueId);
      if (!existingJob) {
        const job = await addJob({
          queue,
          stock,
          period,
          uniqueId,
          milliseconds,
          triggeredBy
        });
        return job;
      } else {
        const jobState = await existingJob.getState();
        if (jobState === "completed") {
          await existingJob.remove();
          const newJob = await addJob({
            queue,
            stock,
            period,
            uniqueId,
            milliseconds,
            triggeredBy
          });
          return newJob;
        }
      }
    } catch (err) {
      throw new Error(err);
    }
}

export function addJob({ queue, stock, period, milliseconds, triggeredBy }) {
  if (milliseconds) {
    return queue.add(
      { stock, period, triggeredBy },
      { delay: milliseconds, jobId: uniqueId }
    );
  } else {
    return queue.add({ stock, period, triggeredBy }, { jobId: uniqueId });
  }
}

Versuche dies! Ich habe versucht, den Code mit async-await ein wenig umzugestalten.

Adithya Sreyaj
quelle
Ich habe meine Frage bearbeitet. Können Sie sie bitte noch einmal überprüfen, insbesondere den Teil "Wichtige späte Bearbeitung"? Gilt Ihre Antwort in dieser Situation noch? Vielen Dank.
Salep