You could use select_for_update
and skip_locked
to prevent the duplicated rows when 3 workers run that task at the same time. Like so:
transactions = WebhookTransaction.objects.filter(status=WebhookTransaction.UNPROCESSED)
transactions = transactions.select_for_update(skip_locked=True, of=("self",))
But this approach will make one worker instance work harder than others (first task selected all the transactions and others don't have much transactions left). You could create a new task which also run in 20 seconds, and this task will split all transactions into smaller chunks (10-20 maybe?) and then trigger process_webhook_transactions
with these chunks.
If handler = WEBHOOK_HANDLERS.get(event, default_handler)
is an asynchronous, I think split chunk approach is also good because you could run it concurrent to improve the speed up the task.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…