Repos / pytaku / 2d994cf953
commit 2d994cf953fc6dd20e54f7f1e1687db564e90ff2
Author: Bùi Thành Nhân <hi@imnhan.com>
Date:   Sun May 24 18:38:47 2020 +0700

    scraping task queue put/pop

diff --git a/src/pytaku_scraper/management/commands/purge_queue.py b/src/pytaku_scraper/management/commands/purge_queue.py
new file mode 100644
index 0000000..fe66b48
--- /dev/null
+++ b/src/pytaku_scraper/management/commands/purge_queue.py
@@ -0,0 +1,17 @@
+from django.core.management.base import BaseCommand
+
+from pytaku_scraper.models import TaskQueue
+
+
+class Command(BaseCommand):
+    help = "Delete all tasks in a queue."
+
+    def add_arguments(self, parser):
+        parser.add_argument("task", choices=["scrape"])
+
+    def handle(self, *args, **options):
+        task = options["task"]
+        assert task == "scrape"
+
+        count, _ = TaskQueue.objects.filter(name=task).delete()
+        print(f'Deleted {count} "{task}" tasks.')
diff --git a/src/pytaku_scraper/management/commands/put_tasks.py b/src/pytaku_scraper/management/commands/put_tasks.py
new file mode 100644
index 0000000..5e85e94
--- /dev/null
+++ b/src/pytaku_scraper/management/commands/put_tasks.py
@@ -0,0 +1,25 @@
+from django.core.management.base import BaseCommand
+
+from pytaku_scraper.models import TaskQueue
+
+
+class Command(BaseCommand):
+    help = "Puts various tasks."
+
+    def add_arguments(self, parser):
+        parser.add_argument("task", choices=["scrape"])
+        parser.add_argument("start_id", type=int)
+        parser.add_argument("end_id", type=int)
+
+    def handle(self, *args, **options):
+        assert options["task"] == "scrape"
+
+        result = TaskQueue.put_bulk(
+            "scrape",
+            [
+                {"url": f"https://mangadex.org/api/?type=manga&id={i}"}
+                for i in range(options["start_id"], options["end_id"] + 1)
+            ],
+        )
+
+        print("Result:", result)
diff --git a/src/pytaku_scraper/management/commands/scrape.py b/src/pytaku_scraper/management/commands/scrape.py
new file mode 100644
index 0000000..c1fe0cf
--- /dev/null
+++ b/src/pytaku_scraper/management/commands/scrape.py
@@ -0,0 +1,30 @@
+import requests
+from django.core.management.base import BaseCommand
+from django.db import transaction
+
+from pytaku_scraper.models import ScrapeAttempt, TaskQueue
+
+
+class Command(BaseCommand):
+    help = "Scrape worker. Run as many as needed."
+
+    def handle(self, *args, **options):
+        task_name = "scrape"
+
+        while True:
+            with transaction.atomic():
+                task = TaskQueue.pop(task_name)
+                task_id = task.id
+                print(f"Processing task {task_id}: {task.payload}")
+                resp = requests.get(task.payload["url"], timeout=30)
+                assert resp.status_code in (200, 404), f"Unexpected error: {resp.text}"
+
+                ScrapeAttempt.objects.create(
+                    url=task.payload["url"],
+                    method="get",  # TODO
+                    resp_body=resp.text,
+                    resp_status=resp.status_code,
+                )
+
+                task.finish()
+                print("Done task", task_id)
diff --git a/src/pytaku_scraper/migrations/0001_initial.py b/src/pytaku_scraper/migrations/0001_initial.py
new file mode 100644
index 0000000..75291d0
--- /dev/null
+++ b/src/pytaku_scraper/migrations/0001_initial.py
@@ -0,0 +1,43 @@
+# Generated by Django 3.0.5 on 2020-05-24 11:32
+
+import django.contrib.postgres.fields.jsonb
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+    initial = True
+
+    dependencies = [
+    ]
+
+    operations = [
+        migrations.CreateModel(
+            name='ScrapeAttempt',
+            fields=[
+                ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
+                ('scraped_at', models.DateTimeField(auto_now_add=True)),
+                ('url', models.CharField(max_length=1024)),
+                ('method', models.CharField(max_length=7)),
+                ('headers', django.contrib.postgres.fields.jsonb.JSONField(default=dict)),
+                ('body', models.TextField()),
+                ('resp_body', models.TextField()),
+                ('resp_status', models.IntegerField()),
+            ],
+            options={
+                'db_table': 'scrape_attempt',
+            },
+        ),
+        migrations.CreateModel(
+            name='TaskQueue',
+            fields=[
+                ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
+                ('created_at', models.DateTimeField(auto_now_add=True)),
+                ('name', models.CharField(choices=[('Scrape', 'scrape')], max_length=100)),
+                ('payload', django.contrib.postgres.fields.jsonb.JSONField(default=dict)),
+            ],
+            options={
+                'db_table': 'task_queue',
+            },
+        ),
+    ]
diff --git a/src/pytaku_scraper/migrations/__init__.py b/src/pytaku_scraper/migrations/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/pytaku_scraper/models.py b/src/pytaku_scraper/models.py
index 8ca2a54..0ff5391 100644
--- a/src/pytaku_scraper/models.py
+++ b/src/pytaku_scraper/models.py
@@ -4,6 +4,58 @@
 QUEUE_NAMES = [("Scrape", "scrape")]
 
 
+class TaskQueue(models.Model):
+    """
+    Simple postgres-backed task queue.
+    Supports concurrent consumers thanks to SELECT FOR UPDATE SKIP LOCKED.
+
+    Usage:
+        - Create task: TaskQueue.put() or TaskQueue.put_bulk()
+        - Consume task:
+            with transaction.atomic():
+                task = TaskQueue.pop()
+                # do work with task
+                task.finish()
+        - If anything goes wrong between pop() and finish(),
+          the task is automatically put back in the queue.
+    """
+
+    class Meta:
+        db_table = "task_queue"
+
+    created_at = models.DateTimeField(auto_now_add=True)
+    name = models.CharField(max_length=100, choices=QUEUE_NAMES)
+    payload = JSONField(default=dict)
+
+    @classmethod
+    def put(cls, name, payload):
+        return cls.objects.create(name=name, payload=payload)
+
+    @classmethod
+    def put_bulk(cls, name, payloads):
+        return cls.objects.bulk_create(
+            [cls(name=name, payload=payload) for payload in payloads]
+        )
+
+    @classmethod
+    def pop(cls, name):
+        """
+        SELECT FOR UPDATE SKIP LOCKED.
+        Must be run inside a transaction.
+
+        Remember to call instance.finish() once you're done.
+        """
+        return (
+            TaskQueue.objects.select_for_update(skip_locked=True)
+            .filter(name=name)
+            .order_by("id")
+            .first()
+        )
+
+    def finish(self):
+        return self.delete()
+
+
 class ScrapeAttempt(models.Model):
     class Meta:
         db_table = "scrape_attempt"
@@ -17,11 +69,3 @@ class Meta:
 
     resp_body = models.TextField()
     resp_status = models.IntegerField()
-
-
-class TaskQueue(models.Model):
-    class Meta:
-        db_table = "task_queue"
-
-    created_at = models.DateTimeField(auto_now_add=True)
-    name = models.CharField(max_length=100, choices=QUEUE_NAMES)