<?xml version="1.0" encoding="UTF-8"?><rss xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:atom="http://www.w3.org/2005/Atom" version="2.0" xmlns:media="http://search.yahoo.com/mrss/"><channel><title><![CDATA[Dmitry's website]]></title><description><![CDATA[Thoughts, tips, and tricks for future self.]]></description><link>https://thedmitry.pw/</link><image><url>https://thedmitry.pw/favicon.png</url><title>Dmitry&apos;s website</title><link>https://thedmitry.pw/</link></image><generator>Ghost 5.78</generator><lastBuildDate>Sun, 19 Apr 2026 01:11:44 GMT</lastBuildDate><atom:link href="https://thedmitry.pw/rss/" rel="self" type="application/rss+xml"/><ttl>60</ttl><item><title><![CDATA[How I connected Powercom IMD-525AP UPS to NUT service]]></title><description><![CDATA[<figure class="kg-card kg-image-card"><img src="https://thedmitry.pw/content/images/2025/06/imp525-1.jpg" class="kg-image" alt loading="lazy" width="695" height="800" srcset="https://thedmitry.pw/content/images/size/w600/2025/06/imp525-1.jpg 600w, https://thedmitry.pw/content/images/2025/06/imp525-1.jpg 695w"></figure><p>I had an old UPS lying around and decided to put it to use to protect my TrueNAS server.</p><p>To ensure the server could detect a power outage and shut down gracefully, I needed a way to monitor the UPS. The most common solution is to use <strong>NUT (Network UPS</strong></p>]]></description><link>https://thedmitry.pw/blog/2025/06/how-i-connected-powercom-imd-525ap-to/</link><guid isPermaLink="false">68600a006aa9810001560d8f</guid><dc:creator><![CDATA[Dmitry Plevkov]]></dc:creator><pubDate>Sat, 28 Jun 2025 16:39:17 GMT</pubDate><content:encoded><![CDATA[<figure class="kg-card kg-image-card"><img src="https://thedmitry.pw/content/images/2025/06/imp525-1.jpg" class="kg-image" alt loading="lazy" width="695" height="800" srcset="https://thedmitry.pw/content/images/size/w600/2025/06/imp525-1.jpg 600w, https://thedmitry.pw/content/images/2025/06/imp525-1.jpg 695w"></figure><p>I had an old UPS lying around and decided to put it to use to protect my TrueNAS server.</p><p>To ensure the server could detect a power outage and shut down gracefully, I needed a way to monitor the UPS. The most common solution is to use <strong>NUT (Network UPS Tools)</strong>. NUT lets you broadcast UPS information to other NUT clients. This server functionality allows other systems to access your UPS information and react accordingly.</p><p>I set up UPS monitoring on my <strong>Proxmox host</strong>, which runs a Debian-based OS. In the future, I plan to move the NUT service to a <strong>Raspberry Pi</strong>, which can run for many hours on UPS power and later wake up the Proxmox server.</p><p>For now, my main goal was to verify that I could get data from the UPS.</p><p>My UPS is the <strong>Imperial IMD-525AP</strong>, a quite old model. Mine is likely from before 2009, as the <code>usbhid-ups</code> driver didn&apos;t recognize it.</p><h2 id="installing-nut-httpsnetworkupstoolsorg">Installing NUT <a href="https://networkupstools.org/?ref=thedmitry.pw">https://networkupstools.org/</a></h2><pre><code class="language-bash">sudo apt update
sudo apt install nut</code></pre><h2 id="getting-your-ups-vendor-and-product-id">Getting your UPS Vendor and Product ID</h2><p>Connect UPS to a computer with provided USB cable and then use this command:</p><pre><code>lsusb</code></pre><p>You&apos;ll see output similar to this:</p><pre><code class="language-bash">Bus 002 Device 002: ID 174c:3074 ASMedia Technology Inc. ASM1074 SuperSpeed hub
Bus 002 Device 001: ID 1d6b:0003 Linux Foundation 3.0 root hub
Bus 001 Device 004: ID 0d9f:0002 Powercom Co., Ltd Black Knight PRO / WOW Uninterruptible Power Supply (Cypress HID-&gt;COM RS232)
Bus 001 Device 002: ID 174c:2074 ASMedia Technology Inc. ASM1074 High-Speed hub
Bus 001 Device 003: ID 26ce:01a2 ASRock LED Controller
Bus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub</code></pre><p>You&apos;re interested in the part after <code>ID</code> &#x2014; that&apos;s the <strong>Vendor ID</strong> and <strong>Product ID</strong>, separated by a colon (<code>:</code>).</p><p>In my case:</p><ul><li>Vendor ID: <code>0d9f</code></li><li>Product ID: <code>0002</code></li></ul><h2 id="configuring-nut">Configuring NUT</h2><p>Edit the UPS configuration file:</p><pre><code class="language-bash">sudo nano /etc/nut/ups.conf</code></pre><p>Initially, I tried the <code>usbhid-ups</code> driver, but it didn&#x2019;t work despite many efforts. Eventually, I discovered that my UPS was too old and switched to the <code>powercom</code> driver.</p><figure class="kg-card kg-image-card"><img src="https://thedmitry.pw/content/images/2025/06/SCR-20250628-qijt.png" class="kg-image" alt loading="lazy" width="2000" height="509" srcset="https://thedmitry.pw/content/images/size/w600/2025/06/SCR-20250628-qijt.png 600w, https://thedmitry.pw/content/images/size/w1000/2025/06/SCR-20250628-qijt.png 1000w, https://thedmitry.pw/content/images/size/w1600/2025/06/SCR-20250628-qijt.png 1600w, https://thedmitry.pw/content/images/2025/06/SCR-20250628-qijt.png 2000w" sizes="(min-width: 720px) 720px"></figure><pre><code># add this to the end of the /etc/nut/ups.conf
[ups]  # this is the name of your ups (you could have many of them)
driver = powercom  # which driver to use
# arguments for driver is here - https://networkupstools.org/docs/man/powercom.html
type = IMP # extra argument
port = /dev/ttyUSB0
nobt = true  # If this flag is present, the battery check on startup is skipped
user = root </code></pre><p>To determine your usb port you can use - <a href="https://unix.stackexchange.com/questions/144029/command-to-determine-ports-of-a-device-like-dev-ttyusb0?ref=thedmitry.pw">https://unix.stackexchange.com/questions/144029/command-to-determine-ports-of-a-device-like-dev-ttyusb0</a></p><p>Why user = root ?? Because that /dev/ttyUSB0 has owner <code>root</code> and group <code>dialout</code></p><p>But by default NUT user is from <code>nut</code> group. I found this advice:</p><pre><code>cp /lib/udev/rules.d/62-nut-usbups.rules  /etc/udev/rules.d/
sudo nano /etc/udev/rules.d/62-nut-usbups.rules
# search for PowerCom section and add this 
ATTR{idVendor}==&quot;0d9f&quot;, ATTR{idProduct}==&quot;0002&quot;, MODE=&quot;664&quot;, GROUP=&quot;nut&quot;
# reconnect UPS&apos;s usb cable</code></pre><p>But it didn&apos;t work. So I just add <code>user</code> parameter to the driver settings so NUT could access this usb port.</p><p>To test the connection:</p><pre><code>sudo /lib/nut/powercom -a ups -u root -DDDD</code></pre><pre><code># In case of stuck processes:
sudo killall powercom 
</code></pre><h2 id="enabling-nut-to-be-accessible-via-network">Enabling NUT to be accessible via network</h2><pre><code>sudo nano /etc/nut/upsd.conf</code></pre><p>Add this line at the end - it will expose NUT on all network interfaces</p><pre><code># add this line to the bottom
LISTEN 0.0.0.0 3493
</code></pre><h2 id="setting-up-a-nut-user">Setting up a NUT User</h2><pre><code>sudo nano /etc/nut/upsd.users</code></pre><p>Next lines define a user called <code>upsmon</code>, with a password that you define (psswd for this example).</p><p>I also define this user as a <code>primary</code> node as the UPS is connected to the local computer itself. If you are connecting to a remote NUT server then this would be a <code>secondary</code> node.</p><pre><code>[upsmon]  # name of user
password = psswd  # your password
upsmon primary
actions = SET  # Let the user do certain things with upsd.
instcmds = ALL  # Let the user initiate specific instant commands (like disable beeper, etc...)</code></pre><h2 id="configuring-the-nut-ups-monitor">Configuring the NUT UPS Monitor</h2><pre><code>sudo nano /etc/nut/upsmon.conf</code></pre><p>You need to write in format</p><pre><code>MONITOR &lt;UPSNAME&gt;@localhost 1 &lt;USERNAME&gt; &lt;PASSWORD&gt; primary</code></pre><p>Primary basically just tells Nut that this UPS is connected directly to our</p><p>So in my case I wrote</p><pre><code>MONITOR ups@localhost 1 upsmon psswd primary</code></pre><h2 id="configuring-nut-as-a-net-server">Configuring NUT as a net server</h2><pre><code>sudo nano /etc/nut/nut.conf
# Search for a line - MODE=none
# Change it to:
MODE=netserver</code></pre><h2 id="enabling-and-starting-nut-services">Enabling and starting nut services</h2><pre><code># for the first time
sudo systemctl enable nut-server
sudo systemctl enable nut-monitor
# 
sudo systemctl restart nut-server
sudo systemctl restart nut-monitor

# check the status and errors 
sudo systemctl status nut-server
sudo systemctl status nut-monitor</code></pre><p>If everything is ok you can check usp with command</p><pre><code>upsc &lt;UPSNAME&gt;

$ upsc ups
Init SSL without certificate database
battery.charge: 100.0
device.mfr: PowerCom
device.model: IMP-525AP
device.serial: Unknown
device.type: ups
driver.name: powercom
driver.parameter.nobt: true
driver.parameter.pollinterval: 2
driver.parameter.port: /dev/ttyUSB0
driver.parameter.synchronous: auto
driver.parameter.type: IMP
driver.version: 2.8.0
driver.version.internal: 0.19
input.frequency: 50.00
input.voltage: 224.0
input.voltage.nominal: 220
output.frequency: 50.00
output.voltage: 224.0
ups.load: 9.0
ups.mfr: PowerCom
ups.model: IMP-525AP
ups.model.type: IMP
ups.serial: Unknown
ups.status: OL</code></pre><p><code>ups.status: OL</code> means ONLINE - the UPS is running on wall power.</p><h2 id="configuring-truenas">Configuring TrueNAS</h2><p>Go to `System-&gt;Services-&gt;UPS. Enable it and enable starting automatically.</p><figure class="kg-card kg-image-card kg-width-wide"><img src="https://thedmitry.pw/content/images/2025/06/SCR-20250628-qyan.png" class="kg-image" alt loading="lazy" width="2000" height="668" srcset="https://thedmitry.pw/content/images/size/w600/2025/06/SCR-20250628-qyan.png 600w, https://thedmitry.pw/content/images/size/w1000/2025/06/SCR-20250628-qyan.png 1000w, https://thedmitry.pw/content/images/size/w1600/2025/06/SCR-20250628-qyan.png 1600w, https://thedmitry.pw/content/images/2025/06/SCR-20250628-qyan.png 2000w" sizes="(min-width: 1200px) 1200px"></figure><figure class="kg-card kg-image-card kg-card-hascaption"><img src="https://thedmitry.pw/content/images/2025/06/SCR-20250628-qyht.png" class="kg-image" alt loading="lazy" width="1706" height="1604" srcset="https://thedmitry.pw/content/images/size/w600/2025/06/SCR-20250628-qyht.png 600w, https://thedmitry.pw/content/images/size/w1000/2025/06/SCR-20250628-qyht.png 1000w, https://thedmitry.pw/content/images/size/w1600/2025/06/SCR-20250628-qyht.png 1600w, https://thedmitry.pw/content/images/2025/06/SCR-20250628-qyht.png 1706w" sizes="(min-width: 720px) 720px"><figcaption><span style="white-space: pre-wrap;">Write credentials and choose what to do (my server will shutdown after 30 seconds of loosing power)</span></figcaption></figure><p>Save settings, then go to Settings-&gt;Shell</p><p>To check that your TrueNas could connect to UPS, type command</p><pre><code>upsc &lt;ups_name&gt;@&lt;nut_server_host&gt;</code></pre><figure class="kg-card kg-image-card"><img src="https://thedmitry.pw/content/images/2025/06/SCR-20250628-qyyq.png" class="kg-image" alt loading="lazy" width="1464" height="1038" srcset="https://thedmitry.pw/content/images/size/w600/2025/06/SCR-20250628-qyyq.png 600w, https://thedmitry.pw/content/images/size/w1000/2025/06/SCR-20250628-qyyq.png 1000w, https://thedmitry.pw/content/images/2025/06/SCR-20250628-qyyq.png 1464w" sizes="(min-width: 720px) 720px"></figure><p>Yes!</p>]]></content:encoded></item><item><title><![CDATA[Redis tips I wish I knew earlier]]></title><description><![CDATA[Talking about python redis package - connection pools, connections leaks, errors like: ConnectionError: Too many connections.
Why to use BlockingConnectionPool and how to properly close  Redis instance.]]></description><link>https://thedmitry.pw/blog/2024/12/redis-tips-i-wish-i-knew-earliesr/</link><guid isPermaLink="false">6766b39261a8d7000172f3b2</guid><category><![CDATA[redis]]></category><category><![CDATA[python_tips]]></category><dc:creator><![CDATA[Dmitry Plevkov]]></dc:creator><pubDate>Sat, 21 Dec 2024 12:55:16 GMT</pubDate><content:encoded><![CDATA[<p>Redis seems to be everywhere these days.<br>I use it all the time myself. But recently, I stumbled upon some fascinating features that I hadn&#x2019;t noticed before.</p><h2 id="surprise-1-connectionpool-blockingconnectionpool"><strong>Surprise</strong> #1: ConnectionPool &amp; BlockingConnectionPool</h2><p>Here&#x2019;s a simple example using the asynchronous version of the Redis library:</p><pre><code class="language-python">import asyncio
from redis.asyncio import Redis

async def ping_redis(redis_client: Redis):
    return await redis_client.ping()

async def main():
    client = Redis(host=&apos;localhost&apos;, port=6379, db=0)
    print(await ping_redis(client))

if __name__ == &apos;__main__&apos;:
    asyncio.run(main())
</code></pre><p>Each command we execute requests a connection to the Redis server.<br>Since we didn&#x2019;t explicitly create a <code>ConnectionPool</code>, it&#x2019;s created automatically for us:</p><pre><code class="language-python">print(client.connection_pool.max_connections)
# Output:
# 2147483648 (this is 2**31)
</code></pre><p>Now let&#x2019;s modify our <code>main()</code> function. Suppose we receive 200 incoming requests, each requiring interaction with Redis:</p><pre><code class="language-python">async def main():
    client = Redis(host=&apos;localhost&apos;, port=6379, db=0)
    
    tasks = [ping_redis(client) for _ in range(200)]
    
    results = await asyncio.gather(*tasks)
    print(len(client.connection_pool._available_connections))
</code></pre><p>After execution, we&#x2019;ll observe that <code>_available_connections == 200</code>, meaning a new connection was created for each request.<br>By default, a single Redis server supports up to <strong>10,000 simultaneous connections</strong>, far fewer than the default connection pool size.</p><p>This could lead to a DoS (Denial of Service) scenario if no measures are taken.</p><p>To avoid exhausting all available server connections, we can use a connection pool that reuses existing connections and limits their total count to something more realistic.</p><div class="kg-card kg-callout-card kg-callout-card-white"><div class="kg-callout-emoji">&#x1F4A1;</div><div class="kg-callout-text">For example, the default connection pool size is set to 8 in Java Redis clients and 10 in Go Redis libraries.</div></div><p>Here&#x2019;s how we can explicitly set <code>max_connections</code> when creating a Redis instance:</p><pre><code class="language-python">async def main():
    client = Redis(host=&apos;localhost&apos;, port=6379, db=0, max_connections=10)
    tasks = [ping_redis(client) for _ in range(200)]
    results = await asyncio.gather(*tasks)
    print(len(client.connection_pool._available_connections))
</code></pre><p>Running this results in:</p><pre><code class="language-python">redis.exceptions.ConnectionError: Too many connections
</code></pre><p>By default, <code>ConnectionPool</code> raises an exception when no connections are available. This isn&#x2019;t ideal &#x2014; I&#x2019;d prefer coroutines to wait for connections to become available.</p><p>For this behavior, we can use <code>BlockingConnectionPool</code>:</p><pre><code class="language-python">from redis.asyncio import Redis, BlockingConnectionPool

async def main():
    pool = BlockingConnectionPool(host=&apos;localhost&apos;, port=6379, db=0, max_connections=10)
    client = Redis(connection_pool=pool)
    tasks = [ping_redis(client) for _ in range(200)]
    results = await asyncio.gather(*tasks)
    print(len(client.connection_pool._available_connections))
</code></pre><p>This runs successfully, with <code>_available_connections == 10</code>.<br>Success! &#x1F389;</p><h2 id="surprise-2-connections-leak-closing-resources"><strong>Surprise</strong> # 2: Connections leak &amp; Closing Resources</h2><p>Here&#x2019;s another surprising issue: after running the application for a long time, you might encounter an error saying it&#x2019;s impossible to connect to Redis.</p><p>This happens because the connection pool isn&#x2019;t closed when the Redis client is closed.</p><pre><code class="language-python">pool = ConnectionPool()
redis = Redis(connection_pool=pool)

await redis.aclose()
# The pool remains open, and connections might leak unless explicitly closed.
</code></pre><p>The idea behind this design is to allow reusing the same connection pool across different parts of an application.</p><p>The confusing part for me was that if you create a Redis instance with both <code>connection_pool</code> and <code>auto_close_connection_pool=True</code>, the latter is ignored:</p><pre><code class="language-python">pool = ConnectionPool()
redis = Redis(connection_pool=pool, auto_close_connection_pool=True)
# auto_close_connection_pool is ignored because you provided the pool manually.
# The library assumes you&#x2019;re responsible for closing the pool.
</code></pre><p>This behavior might seem unintuitive &#x2014; and you&#x2019;re right!<br>Thankfully, in recent versions of <code>redis-py</code>, the <code>auto_close_connection_pool</code> parameter has been marked as <strong>deprecated</strong>.</p><h3 id="the-right-way-to-close-resources">The Right Way to Close Resources</h3><p>You must manually close the connection pool:</p><pre><code class="language-python">pool = ConnectionPool()
redis = Redis(connection_pool=pool)
...
await redis.aclose()
await pool.close()
</code></pre><p>Alternatively, use the new <code>.from_pool()</code> method:</p><pre><code class="language-python">pool = ConnectionPool()
redis = Redis.from_pool(pool)
...
await redis.aclose()
# The pool is now also closed.
</code></pre><p>Personally I prefer to manually call close on the pool just to be sure.</p><p>If you&apos;re using Sentinel - there are `ConnectionPool` instances by default, so make sure to experiment on how to open and close pools and connections.</p>]]></content:encoded></item><item><title><![CDATA[Fastapi, async SQLAlchemy, pytest, and Alembic (all using asyncpg)]]></title><description><![CDATA[We'll explore the integration of FastAPI with the new asynchronous SQLAlchemy 2.0, Alembic for migrations, and pytest for testing.
All of that with only asyncpg as our driver]]></description><link>https://thedmitry.pw/blog/2023/08/fastapi-async-sqlalchemy-pytest-and-alembic/</link><guid isPermaLink="false">64d4d13365f2c10001b8264d</guid><category><![CDATA[python_tips]]></category><category><![CDATA[pytest]]></category><category><![CDATA[fastapi]]></category><category><![CDATA[sqlalchemy]]></category><category><![CDATA[alembic]]></category><dc:creator><![CDATA[Dmitry Plevkov]]></dc:creator><pubDate>Sun, 13 Aug 2023 08:15:25 GMT</pubDate><content:encoded><![CDATA[<h2 id="introduction">Introduction</h2><p>In this article, we&apos;ll explore the integration of FastAPI with the new asynchronous SQLAlchemy 2.0. Additionally, we&apos;ll delve into configuring pytest to execute asynchronous tests, allowing compatibility with pytest-xdist. We&apos;ll also cover the application of Alembic for db migrations with an asynchronous database driver.</p><p>The inspiration for this sparked as I delved into the insightful book &quot;<a href="https://www.cosmicpython.com/book/preface.html?ref=thedmitry.pw" rel="noreferrer">Architecture Patterns with Python</a>&quot; authored by Harry Percival &amp; Bob Gregory.</p><p>I also encountered the captivating concept of the &quot;Stairway test,&quot; which is eloquently detailed in the repository by <a href="https://github.com/alvassin/alembic-quickstart/tree/master?ref=thedmitry.pw">https://github.com/alvassin/alembic-quickstart/tree/master</a>. This concept profoundly resonated with me and led me to formulate the ideas presented in this post.</p><h3 id="reqirements">Reqirements</h3><p>I run this project using Python 3.9, probably you can easily adapt it to work on earlier versions.</p><p>I use poetry to manage project requirements.</p><p>Source code can be found <a href="https://github.com/wwarne/fastapi_sqlalchemy_v2_alembic?ref=thedmitry.pw" rel="noreferrer">here</a></p><h2 id="install-dependencies">Install dependencies</h2><pre><code class="language-bash">$ poetry add fastapi uvicorn uvloop asyncpg alembic pydantic-settings
$ poetry add sqlalchemy --extras asyncio</code></pre><h3 id="install-dev-dependencies">Install dev dependencies</h3><pre><code class="language-bash">$ poetry add --group=dev httpx sqlalchemy-utils pytest yarl mypy black isort</code></pre><h1 id="setting-up-the-database">Setting up the database</h1><p>We&apos;re going to use FastAPI to create a straightforward API designed for user creation and retrieval from a database. Our primary objective is to illustrate the synergy between SQLAlchemy 2.0 and FastAPI, thus the API intricacies won&apos;t be our focal point in this context.</p><p>Let&apos;s start by creating a configuration file that will hold our database connection string. In my preference, I opt for leveraging <code>pydantic-settings</code> in scenarios of this nature. However, feel free to utilize any other method such as <code>os.getenv</code> if that aligns better with your workflow.</p><p>For the sake of clarity, I have encapsulated the entire database URI within a single parameter. It&apos;s important to note that in a real-world scenario, such configuration settings would likely be segregated into discrete entities like db_host, db_port, db_user, db_password, and more.</p><pre><code class="language-python"># app/settings.py
from pathlib import Path

from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
    app_name: str = &quot;Example API&quot;
    app_host: str = &quot;0.0.0.0&quot;
    app_port: int = 3000

    database_url: str = &quot;postgresql+asyncpg://blog_example_user:password@localhost:5432/blog_example_base&quot;

    project_root: Path = Path(__file__).parent.parent.resolve()

    model_config = SettingsConfigDict(env_file=&quot;.env&quot;)


settings = Settings()
</code></pre><p>I&apos;m not going to cover how to start a local PostgreSQL database in this post, but you can, for example use <a href="https://hub.docker.com/_/postgres?ref=thedmitry.pw" rel="noreferrer">the official docker image</a> to start a local database.</p><p>To be able to run tests your database user should have <code>CREATEDB</code> privilege.</p><pre><code class="language-sql">Example SQL commands to create a new user with a new database.

CREATE USER &quot;blog_example_user&quot; WITH PASSWORD &apos;password&apos;;
CREATE DATABASE &quot;blog_example_base&quot; OWNER &quot;blog_example_user&quot;;
ALTER USER &quot;blog_example_user&quot; CREATEDB;</code></pre><h3 id="lets-create-our-orm-model">Let&apos;s create our ORM model</h3><p>I prefer to put all orm-related stuff into an <code>orm</code> module and use its <code>__init__.py</code></p><p>So in code i use it like <code>import orm</code> <code>query = select(orm.User)...</code></p><p>This way it&apos;s much easier to distinguish between SQLAlchemy models and my business models.</p><p>So, first let&apos;s add the base class</p><pre><code class="language-python"># orm/base_model.py
from sqlalchemy import MetaData
from sqlalchemy.orm import DeclarativeBase

# Default naming convention for all indexes and constraints
# See why this is important and how it would save your time:
# https://alembic.sqlalchemy.org/en/latest/naming.html
convention = {
    &quot;all_column_names&quot;: lambda constraint, table: &quot;_&quot;.join(
        [column.name for column in constraint.columns.values()]
    ),
    &quot;ix&quot;: &quot;ix__%(table_name)s__%(all_column_names)s&quot;,
    &quot;uq&quot;: &quot;uq__%(table_name)s__%(all_column_names)s&quot;,
    &quot;ck&quot;: &quot;ck__%(table_name)s__%(constraint_name)s&quot;,
    &quot;fk&quot;: &quot;fk__%(table_name)s__%(all_column_names)s__%(referred_table_name)s&quot;,
    &quot;pk&quot;: &quot;pk__%(table_name)s&quot;,
}


class OrmBase(DeclarativeBase):
    metadata = MetaData(naming_convention=convention)  # type: ignore


</code></pre><p>Then, let&apos;s create a session manager for our database. This class will be used as a singleton and will be responsible for abstracting the database connection and session handling:</p><pre><code class="language-python"># orm/session_manager.py
import contextlib
from typing import AsyncIterator, Optional

from sqlalchemy.ext.asyncio import (
    AsyncConnection,
    AsyncEngine,
    AsyncSession,
    async_sessionmaker,
    create_async_engine,
)


class DatabaseSessionManager:
    def __init__(self) -&gt; None:
        self._engine: Optional[AsyncEngine] = None
        self._sessionmaker: Optional[async_sessionmaker[AsyncSession]] = None

    def init(self, db_url: str) -&gt; None:
        # Just additional example of customization.
        # you can add parameters to init and so on
        if &quot;postgresql&quot; in db_url:
            # These settings are needed to work with pgbouncer in transaction mode
            # because you can&apos;t use prepared statements in such case
            connect_args = {
                &quot;statement_cache_size&quot;: 0,
                &quot;prepared_statement_cache_size&quot;: 0,
            }
        else:
            connect_args = {}
        self._engine = create_async_engine(
            url=db_url,
            pool_pre_ping=True,
            connect_args=connect_args,
        )
        self._sessionmaker = async_sessionmaker(
            bind=self._engine,
            expire_on_commit=False,
        )

    async def close(self) -&gt; None:
        if self._engine is None:
            return
        await self._engine.dispose()
        self._engine = None
        self._sessionmaker = None

    @contextlib.asynccontextmanager
    async def session(self) -&gt; AsyncIterator[AsyncSession]:
        if self._sessionmaker is None:
            raise IOError(&quot;DatabaseSessionManager is not initialized&quot;)
        async with self._sessionmaker() as session:
            try:
                yield session
            except Exception:
                await session.rollback()
                raise

    @contextlib.asynccontextmanager
    async def connect(self) -&gt; AsyncIterator[AsyncConnection]:
        if self._engine is None:
            raise IOError(&quot;DatabaseSessionManager is not initialized&quot;)
        async with self._engine.begin() as connection:
            try:
                yield connection
            except Exception:
                await connection.rollback()
                raise


db_manager = DatabaseSessionManager()</code></pre><p>Notice that we&apos;re we&apos;re using the async version of the <code>create_engine</code> method, which returns an <code>AsyncEngine</code> object. We will also use the async version of the <code>sessionmaker</code> method, which returns an <code>AsyncSession</code> object for committing and rolling back transactions.</p><p>We are going to use <code>init</code> and <code>close</code> methods in FastAPI&apos;s <a href="https://fastapi.tiangolo.com/advanced/events/?h=life&amp;ref=thedmitry.pw#lifespan" rel="noreferrer">lifespan event</a>, to run it during startup and shutdown of our application.</p><p>The benefits of this approach are:</p><ul><li>You can connect to as many databases as needed, which was a problem for me with the middleware approach. (just create different DatabaseSessionManager for each database)</li><li>Your DB connections are released at application shutdown instead of garbage collection, which means you won&apos;t run into issues if you use <code>uvicorn --reload</code></li><li>Your DB sessions will automatically be closed when the route using <code>session</code> dependency finishes, so any uncommitted operations will be rolled back.</li></ul><p>Then, we need to create a FastAPI dependency that will be used to get the database session. This dependency will be used in the API views:</p><pre><code class="language-python"># orm/session_manager.py
async def get_session() -&gt; AsyncSession:
    async with db_manager.session() as session:
        yield session</code></pre><p>And we&apos;re done with the database configuration. Now we can create the database models (I just used the model from SQLAlchemy tutorial):</p><pre><code class="language-python"># orm/user_model.py

from typing import Optional

from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column

from .base_model import OrmBase


class User(OrmBase):
    __tablename__ = &quot;user_account&quot;

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(30))
    fullname: Mapped[Optional[str]]

    def __repr__(self) -&gt; str:
        return f&quot;User(id={self.id!r}, name={self.name!r}, fullname={self.fullname!r})&quot;
</code></pre><p>It&apos;s the most modern form of Declarative, which is driven from <a href="https://peps.python.org/pep-0484/?ref=thedmitry.pw"><strong>PEP 484</strong></a> type annotations using a special type <a href="https://docs.sqlalchemy.org/en/20/orm/internals.html?ref=thedmitry.pw#sqlalchemy.orm.Mapped"><code>Mapped</code></a>, which indicates attributes to be mapped as particular types. </p><figure class="kg-card kg-bookmark-card kg-card-hascaption"><a class="kg-bookmark-container" href="https://docs.sqlalchemy.org/en/20/tutorial/metadata.html?ref=thedmitry.pw#declaring-mapped-classes"><div class="kg-bookmark-content"><div class="kg-bookmark-title">Working with Database Metadata &#x2014; SQLAlchemy 2.0 Documentation</div><div class="kg-bookmark-description"></div><div class="kg-bookmark-metadata"><img class="kg-bookmark-icon" src="https://www.sqlalchemy.org/favicon.ico" alt><span class="kg-bookmark-author">The Database Toolkit for Python</span></div></div><div class="kg-bookmark-thumbnail"><img src="https://www.sqlalchemy.org/img/sqla_logo.png" alt></div></a><figcaption><p><span style="white-space: pre-wrap;">Read more about Declaring Mapped Classes</span></p></figcaption></figure><pre><code class="language-python"># orm/__init__.py
&quot;&quot;&quot;
Data structures, used in project.

Add your new models here so Alembic could pick them up.

You may do changes in tables, then execute
`alembic revision --message=&quot;Your text&quot; --autogenerate`
and alembic would generate new migration for you
in alembic/versions folder.
&quot;&quot;&quot;
from .base_model import OrmBase
from .session_manager import db_manager, get_session
from .user_model import User

__all__ = [&quot;OrmBase&quot;, &quot;get_session&quot;, &quot;db_manager&quot;, &quot;User&quot;]
</code></pre><p></p><p>I use dunder init file in orm module to be able to use <code>import orm</code> and then call objects like <code>orm.db_manager</code>, <code>orm.User</code> etc... This approach substantially simplifies the distinction  between your SQLAlchemy models and your business-oriented models.</p><h3 id="creating-the-api-views">Creating the API views</h3><p>Now that we have the database configuration and models set up, we can create the API views. </p><p>For simplicity I&apos;m going to put all API related models and functions in one file so you can check it easily. In real life you probably should consider segmentation for organizational clarity.</p><p>Let&apos;s start by creating a models for validating incoming API request and providing a response:</p><pre><code class="language-python"># api/user.py
from pydantic import BaseModel, ConfigDict, Field

class UserCreateRequest(BaseModel):
    name: str = Field(max_length=30)
    fullname: str


class UserResponse(BaseModel):
    id: int
    name: str
    fullname: str

    model_config = ConfigDict(from_attributes=True)</code></pre><p>Then you need to return a list of users, many people opt for a simplistic approach such as responding with something like <code>list[User]</code> . After some time they need to add some additional information to such endpoint, but it can&apos;t be done easily.</p><p>So it&apos;s better to use more flexible response structure from the beginning, like:</p><pre><code class="language-python"># api/user.py
class APIUserResponse(BaseModel):
    status: Literal[&apos;ok&apos;] = &apos;ok&apos;
    data: UserResponse


class APIUserListResponse(BaseModel):
    status: Literal[&apos;ok&apos;] = &apos;ok&apos;
    data: list[UserResponse]</code></pre><p>And, finally, our API views:</p><p>Few extra notes:</p><ul><li>Our UserResponse model has every field which is already json-compatible (strings and ints). That&apos;s why we can use <code>.model_dump()</code> . If your model have some types like <code>UUID</code> , <code>datetime</code>, other classes, - you can use <code>.model_dump(mode=&apos;json&apos;)</code> and pydantic will automatically convert output values to be json-supported types.</li><li>I prefer to return Response directly than use FastAPI&apos;s <code>response_model</code> conversion. For me it&apos;s more convenient plus it&apos;s actually faster. (You could check <a href="https://github.com/falkben/fastapi_experiments/?ref=thedmitry.pw">https://github.com/falkben/fastapi_experiments/</a>  -&gt; orjson_response.py)</li><li>For the sake of simplicity I do ORM queries right in api views. In bigger project it&apos;s better to create additional service layer and put all your orm/sql queries in one module.  If such queries spread through your code base you are going to regret it later.</li></ul><pre><code class="language-python"># api/user.py
import uuid
from typing import Literal

from fastapi import APIRouter, Depends, status
from fastapi.responses import JSONResponse
from pydantic import BaseModel, ConfigDict, Field
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession

import orm


class UserCreateRequest(BaseModel):
    name: str = Field(max_length=30)
    fullname: str


class UserResponse(BaseModel):
    id: int
    name: str
    fullname: str

    model_config = ConfigDict(from_attributes=True)


class APIUserResponse(BaseModel):
    status: Literal[&quot;ok&quot;] = &quot;ok&quot;
    data: UserResponse


class APIUserListResponse(BaseModel):
    status: Literal[&quot;ok&quot;] = &quot;ok&quot;
    data: list[UserResponse]


router = APIRouter()


@router.get(&quot;/{user_id}/&quot;, response_model=APIUserResponse)
async def get_user(
    user_id: int, session: AsyncSession = Depends(orm.get_session)
) -&gt; JSONResponse:
    user = await session.get(orm.User, user_id)
    if not user:
        return JSONResponse(
            content={&quot;status&quot;: &quot;error&quot;, &quot;message&quot;: &quot;User not found&quot;},
            status_code=status.HTTP_404_NOT_FOUND,
        )
    response_model = UserResponse.model_validate(user)
    return JSONResponse(
        content={
            &quot;status&quot;: &quot;ok&quot;,
            &quot;data&quot;: response_model.model_dump(),
        }
    )


@router.get(&quot;/&quot;, response_model=APIUserListResponse)
async def get_users(session: AsyncSession = Depends(orm.get_session)) -&gt; JSONResponse:
    users_results = await session.scalars(select(orm.User))
    response_data = [
        UserResponse.model_validate(u).model_dump() for u in users_results.all()
    ]
    return JSONResponse(
        content={
            &quot;status&quot;: &quot;ok&quot;,
            &quot;data&quot;: response_data,
        }
    )


@router.post(&quot;/&quot;, response_model=APIUserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
    user_data: UserCreateRequest, session: AsyncSession = Depends(orm.get_session)
) -&gt; JSONResponse:
    user_candidate = orm.User(**user_data.model_dump())
    session.add(user_candidate)
    # I skip error handling
    await session.commit()
    await session.refresh(user_candidate)
    response_model = UserResponse.model_validate(user_candidate)
    return JSONResponse(
        content={
            &quot;status&quot;: &quot;ok&quot;,
            &quot;data&quot;: response_model.model_dump(),
        },
        status_code=status.HTTP_201_CREATED,
    )
</code></pre><p>Here we have a simple FastAPI router with three API views: <code>get_user</code>, <code>get_users</code> and <code>create_user</code>. Notice that we&apos;re using the <code>Depends</code> keyword to inject the database async session into the API views. This is how we can use the database session in the API views.</p><h1 id="setting-up-fastapi">Setting up FastAPI</h1><p>Now that we have the API views set up, we can create the FastAPI application.</p><pre><code class="language-python"># main.py
import contextlib
from typing import AsyncIterator

import uvicorn
from fastapi import FastAPI

import orm
from api import user
from app.settings import settings


@contextlib.asynccontextmanager
async def lifespan(app: FastAPI) -&gt; AsyncIterator[None]:
    orm.db_manager.init(settings.database_url)
    yield
    await orm.db_manager.close()


app = FastAPI(title=&quot;Very simple example&quot;, lifespan=lifespan)
app.include_router(user.router, prefix=&quot;/api/users&quot;, tags=[&quot;users&quot;])

if __name__ == &quot;__main__&quot;:
    # There are a lot of parameters for uvicorn, you should check the docs
    uvicorn.run(
        app,
        host=settings.app_host,
        port=settings.app_port,
    )
</code></pre><p>In order for us to run our application, first we&apos;ll need to create our database tables. Let&apos;s see how we can do that using Alembic.</p><h3 id="migrations-with-alembic">Migrations with Alembic</h3><p>To start with alembic, we can use the <code>alembic init</code> command to create the alembic configuration. We&apos;ll use the <code>async</code> template for this: </p><pre><code class="language-bash">alembic init -t async alembic</code></pre><p>This will create the <code>alembic</code> directory with the alembic configuration. We&apos;ll need to make a few changes to the configuration.</p><h3 id="alembicini"><strong>alembic.ini</strong></h3><p>Uncomment the <code>file_template</code> line so names of the migrations will be more user- friendly and with dates so we can sort them.</p><p>Remove <code>sqlalchemy.url</code> string because we are going to set this parameter via <code>alembic/env.py</code> </p><h3 id="alembicenvpy">alembic/env.py</h3><p>First, we&apos;ll need to import our database models so that they&apos;re added to the <code>Base.metadata</code> object. This happens automatically when the model inherits from <code>OrmBase</code>, but we need to import the models to ensure that they&apos;re imported before the alembic configuration is loaded. Because we put all models into <code>orm/__init__.py</code> we can do <code>import orm</code> and models will be loaded.</p><p> Then, we need to set the <code>sqlalchemy.url</code> configuration to use our database connection string. </p><p>Important note - We are going to generate Alembic configuration for tests, so we need to be careful and not to rewrite <code>sqlalchemy.url</code> if it&apos;s already set.</p><p>And finally, we&apos;ll point the <code>target metadata</code> to our <code>Base.metadata</code> object.</p><p>Below I&apos;ll show the changes we need to make to the <code>alembic/env.py</code> file:</p><pre><code class="language-python"># alembic/env.py

import orm
from app.settings import settings
current_url = config.get_main_option(&apos;sqlalchemy.url&apos;, None)
if not current_url:
    config.set_main_option(&quot;sqlalchemy.url&quot;, settings.database_url)

target_metadata = orm.OrmBase.metadata </code></pre><p>Then, we&apos;re able to run the <code>alembic revision</code> command to create a new revision:</p><pre><code class="language-bash">alembic revision --autogenerate -m &quot;Add user model&quot;</code></pre><p>This will create a new revision file in the <code>alembic/versions</code> directory. We can then run the <code>alembic upgrade head</code> command to apply the migration to the database:</p><pre><code class="language-bash">alembic upgrade head</code></pre><p>To revert the last migration you could use </p><pre><code class="language-bash">alembic downgrade -1</code></pre><figure class="kg-card kg-bookmark-card kg-card-hascaption"><a class="kg-bookmark-container" href="https://alembic.sqlalchemy.org/en/latest/tutorial.html?ref=thedmitry.pw"><div class="kg-bookmark-content"><div class="kg-bookmark-title">Tutorial &#x2014; Alembic 1.11.2 documentation</div><div class="kg-bookmark-description"></div><div class="kg-bookmark-metadata"><img class="kg-bookmark-icon" src="https://sqlalchemy.org/favicon.ico" alt></div></div></a><figcaption><p><span style="white-space: pre-wrap;">More about alembic commands</span></p></figcaption></figure><h1 id="starting-the-server">Starting the server</h1><p>To start the server, run<code>python main.py</code>. This will start the server on port 8000 by default. The docs will be available at <code>http://localhost:8000/docs</code>. You should be able to see and run any of the API views that we&apos;ve created.</p><p>This should be enough to start using FastAPI with SQLAlchemy 2.0. However, one important component of software development is testing, so let&apos;s see how we can test our API views.</p><h1 id="testing-the-api-views">Testing the API views</h1><p>For this section, my focus is primarily on demonstrating the mechanics of integration testing with FastAPI and SQLAlchemy 2.0. This means that our tests will call the API views and check the responses. While we  won&apos;t be testing the database models, it&apos;s worth noting that a similar setup can be applied for such scenarios as well.</p><p>We&apos;ll start with the helper functions we are going to need:</p><p>The<code>sqlalchemy_utils</code> package have the two very useful functions - <code>create_database</code> and <code>drop_database.</code></p><p>Regrettably, these functions are synchronous and incompatible with the <code>asyncpg</code> driver. This typically leads tutorials to recommend the installation of <code>psycopg2</code> and the adoption of a separate synchronous engine for database creation. However, in the spirit of experimentation, we can just slightly modify such functions so they can use <code>create_async_engine</code> </p><p>You can see them in <a href="https://github.com/wwarne/fastapi_sqlalchemy_v2_alembic?ref=thedmitry.pw" rel="noreferrer">Github repo</a></p><p>Next utils are:</p><pre><code class="language-python"># tests/db_utils.py
import contextlib
import uuid
from argparse import Namespace
from pathlib import Path
from typing import AsyncIterator, Optional, Union

import sqlalchemy as sa
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy_utils.functions.database import (
    _set_url_database,
    _sqlite_file_exists,
    make_url,
)
from sqlalchemy_utils.functions.orm import quote
from yarl import URL

from alembic.config import Config as AlembicConfig
from app.settings import settings


def make_alembic_config(
    cmd_opts: Namespace, base_path: Union[str, Path] = settings.project_root
) -&gt; AlembicConfig:
    # Replace path to alembic.ini file to absolute
    base_path = Path(base_path)
    if not Path(cmd_opts.config).is_absolute():
        cmd_opts.config = str(base_path.joinpath(cmd_opts.config).absolute())
    config = AlembicConfig(
        file_=cmd_opts.config,
        ini_section=cmd_opts.name,
        cmd_opts=cmd_opts,
    )
    # Replace path to alembic folder to absolute
    alembic_location = config.get_main_option(&quot;script_location&quot;)
    if not Path(alembic_location).is_absolute():
        config.set_main_option(
            &quot;script_location&quot;, str(base_path.joinpath(alembic_location).absolute())
        )
    if cmd_opts.pg_url:
        config.set_main_option(&quot;sqlalchemy.url&quot;, cmd_opts.pg_url)
    return config


def alembic_config_from_url(pg_url: Optional[str] = None) -&gt; AlembicConfig:
    &quot;&quot;&quot;Provides python object, representing alembic.ini file.&quot;&quot;&quot;
    cmd_options = Namespace(
        config=&quot;alembic.ini&quot;,  # Config file name
        name=&quot;alembic&quot;,  # Name of section in .ini file to use for Alembic config
        pg_url=pg_url,  # DB URI
        raiseerr=True,  # Raise a full stack trace on error
        x=None,  # Additional arguments consumed by custom env.py scripts
    )
    return make_alembic_config(cmd_opts=cmd_options)


@contextlib.asynccontextmanager
async def tmp_database(db_url: URL, suffix: str = &quot;&quot;, **kwargs) -&gt; AsyncIterator[str]:
    &quot;&quot;&quot;Context manager for creating new database and deleting it on exit.&quot;&quot;&quot;
    tmp_db_name = &quot;.&quot;.join([uuid.uuid4().hex, &quot;tests-base&quot;, suffix])
    tmp_db_url = str(db_url.with_path(tmp_db_name))
    await create_database_async(tmp_db_url, **kwargs)
    try:
        yield tmp_db_url
    finally:
        await drop_database_async(tmp_db_url)

# Next functions are copied from `sqlalchemy_utils` and slightly 
# modified to support async. Maybe 
async def create_database_async(
    url: str, encoding: str = &quot;utf8&quot;, template: Optional[str] = None
) -&gt; None:
    url = make_url(url)
    database = url.database
    dialect_name = url.get_dialect().name
    dialect_driver = url.get_dialect().driver

    if dialect_name == &quot;postgresql&quot;:
        url = _set_url_database(url, database=&quot;postgres&quot;)
    elif dialect_name == &quot;mssql&quot;:
        url = _set_url_database(url, database=&quot;master&quot;)
    elif dialect_name == &quot;cockroachdb&quot;:
        url = _set_url_database(url, database=&quot;defaultdb&quot;)
    elif not dialect_name == &quot;sqlite&quot;:
        url = _set_url_database(url, database=None)

    if (dialect_name == &quot;mssql&quot; and dialect_driver in {&quot;pymssql&quot;, &quot;pyodbc&quot;}) or (
        dialect_name == &quot;postgresql&quot;
        and dialect_driver in {&quot;asyncpg&quot;, &quot;pg8000&quot;, &quot;psycopg2&quot;, &quot;psycopg2cffi&quot;}
    ):
        engine = create_async_engine(url, isolation_level=&quot;AUTOCOMMIT&quot;)
    else:
        engine = create_async_engine(url)

    if dialect_name == &quot;postgresql&quot;:
        if not template:
            template = &quot;template1&quot;

        async with engine.begin() as conn:
            text = &quot;CREATE DATABASE {} ENCODING &apos;{}&apos; TEMPLATE {}&quot;.format(
                quote(conn, database), encoding, quote(conn, template)
            )
            await conn.execute(sa.text(text))

    elif dialect_name == &quot;mysql&quot;:
        async with engine.begin() as conn:
            text = &quot;CREATE DATABASE {} CHARACTER SET = &apos;{}&apos;&quot;.format(
                quote(conn, database), encoding
            )
            await conn.execute(sa.text(text))

    elif dialect_name == &quot;sqlite&quot; and database != &quot;:memory:&quot;:
        if database:
            async with engine.begin() as conn:
                await conn.execute(sa.text(&quot;CREATE TABLE DB(id int)&quot;))
                await conn.execute(sa.text(&quot;DROP TABLE DB&quot;))

    else:
        async with engine.begin() as conn:
            text = f&quot;CREATE DATABASE {quote(conn, database)}&quot;
            await conn.execute(sa.text(text))

    await engine.dispose()


async def drop_database_async(url: str) -&gt; None:
    url = make_url(url)
    database = url.database
    dialect_name = url.get_dialect().name
    dialect_driver = url.get_dialect().driver

    if dialect_name == &quot;postgresql&quot;:
        url = _set_url_database(url, database=&quot;postgres&quot;)
    elif dialect_name == &quot;mssql&quot;:
        url = _set_url_database(url, database=&quot;master&quot;)
    elif dialect_name == &quot;cockroachdb&quot;:
        url = _set_url_database(url, database=&quot;defaultdb&quot;)
    elif not dialect_name == &quot;sqlite&quot;:
        url = _set_url_database(url, database=None)

    if dialect_name == &quot;mssql&quot; and dialect_driver in {&quot;pymssql&quot;, &quot;pyodbc&quot;}:
        engine = create_async_engine(url, connect_args={&quot;autocommit&quot;: True})
    elif dialect_name == &quot;postgresql&quot; and dialect_driver in {
        &quot;asyncpg&quot;,
        &quot;pg8000&quot;,
        &quot;psycopg2&quot;,
        &quot;psycopg2cffi&quot;,
    }:
        engine = create_async_engine(url, isolation_level=&quot;AUTOCOMMIT&quot;)
    else:
        engine = create_async_engine(url)

    if dialect_name == &quot;sqlite&quot; and database != &quot;:memory:&quot;:
        if database:
            os.remove(database)
    elif dialect_name == &quot;postgresql&quot;:
        async with engine.begin() as conn:
            # Disconnect all users from the database we are dropping.
            version = conn.dialect.server_version_info
            pid_column = &quot;pid&quot; if (version &gt;= (9, 2)) else &quot;procpid&quot;
            text = &quot;&quot;&quot;
            SELECT pg_terminate_backend(pg_stat_activity.{pid_column})
            FROM pg_stat_activity
            WHERE pg_stat_activity.datname = &apos;{database}&apos;
            AND {pid_column} &lt;&gt; pg_backend_pid();
            &quot;&quot;&quot;.format(
                pid_column=pid_column, database=database
            )
            await conn.execute(sa.text(text))

            # Drop the database.
            text = f&quot;DROP DATABASE {quote(conn, database)}&quot;
            await conn.execute(sa.text(text))
    else:
        async with engine.begin() as conn:
            text = f&quot;DROP DATABASE {quote(conn, database)}&quot;
            await conn.execute(sa.text(text))

    await engine.dispose()
</code></pre><p>Let&apos;s start by creating a <code>conftest.py</code> file in the root of our <code>tests/integration</code> directory. This file will be responsible for setting up the test database. Since this is an intricate setup, let&apos;s break it down into smaller pieces.  We&apos;ll start with the imports:</p><pre><code class="language-python"># tests/conftest.py
from typing import Optional

import pytest
from httpx import AsyncClient
from yarl import URL

import orm
from alembic.command import upgrade
from app.settings import settings
from orm.session_manager import db_manager
from tests.db_utils import alembic_config_from_url, tmp_database</code></pre><p>There isn&apos;t much action going here. We&apos;re importing the necessary packages. Let&apos;s move on and create our <code>app</code> and <code>client</code> fixtures, used to create the FastAPI test application and test client:</p><pre><code class="language-python"># tests/conftest.py
@pytest.fixture()
def app():
    from main import app

    yield app


@pytest.fixture()
async def client(session, app):
    async with AsyncClient(app=app, base_url=&quot;http://test&quot;) as client:
        yield client</code></pre><p>Because we use FastAPI, and it uses <code>anyio</code> we can use it in our tests. Many people are using <code>pytest-asyncio</code>. To skip testing on <code>trio</code> eventloop we need to create a new fixture. With it we also can just write <code>async def test_...</code> test functions without marking them additionally.</p><pre><code class="language-python"># tests/conftest.py
@pytest.fixture(scope=&quot;session&quot;, autouse=True)
def anyio_backend():
    return &quot;asyncio&quot;, {&quot;use_uvloop&quot;: True}</code></pre><p>And now we&apos;re ready to create the database connection and session. Our test connection will be scoped to the session, so that we can use the same connection for all the tests, as it&apos;s best practice to avoid creating a new connection for each test, or even request.</p><pre><code class="language-python"># tests/conftest.py

@pytest.fixture(scope=&quot;session&quot;)
def pg_url():
    &quot;&quot;&quot;Provides base PostgreSQL URL for creating temporary databases.&quot;&quot;&quot;
    return URL(settings.database_url)


@pytest.fixture(scope=&quot;session&quot;)
async def migrated_postgres_template(pg_url):
    &quot;&quot;&quot;
    Creates temporary database and applies migrations.

    Has &quot;session&quot; scope, so is called only once per tests run.
    &quot;&quot;&quot;
    async with tmp_database(pg_url, &quot;pytest&quot;) as tmp_url:
        alembic_config = alembic_config_from_url(tmp_url)
        # sometimes we have so called data-migrations.
        # they can call different db-related functions etc..
        # so we modify our settings
        settings.database_url = tmp_url
        
        # It is important to always close the connections at the end of such migrations,
        # or we will get errors like `source database is being accessed by other users`        

        upgrade(alembic_config, &quot;head&quot;)

        yield tmp_url


@pytest.fixture(scope=&quot;session&quot;)
async def sessionmanager_for_tests(migrated_postgres_template):
    db_manager.init(db_url=migrated_postgres_template)
    # can add another init (redis, etc...)
    yield db_manager
    await db_manager.close()


@pytest.fixture()
async def session(sessionmanager_for_tests):
    async with db_manager.session() as session:
        yield session

    # Clean tables after each test. I tried:
    # 1. Create new database using an empty `migrated_postgres_template` as template
    # (postgres could copy whole db structure)
    # 2. Do TRUNCATE after each test.
    # 3. Do DELETE after each test.
    # DELETE FROM is the fastest
    # https://www.lob.com/blog/truncate-vs-delete-efficiently-clearing-data-from-a-postgres-table
    # BUT DELETE FROM query does not reset any AUTO_INCREMENT counter
    async with db_manager.connect() as conn:
        for table in reversed(orm.OrmBase.metadata.sorted_tables):
            # Clean tables in such order that tables which depend on another go first
            await conn.execute(table.delete())
        await conn.commit()</code></pre><p><code>DELETE FROM</code> does not reset any AUTO_INCREMENT counter so our user.id attribute is going to increase during single tests run. You should consider if it&apos;s bad for you or not. For me it&apos;s no problem, I don&apos;t want to switch to TRUNCATE.</p><p>Now we can write our first and simple test</p><pre><code class="language-python"># tests/test_orm_works.py

from sqlalchemy import text

import orm


async def test_orm_session(session):
    user = orm.User(
        name=&quot;Michael&quot;,
        fullname=&quot;Michael Test Jr.&quot;,
    )
    session.add(user)
    await session.commit()

    rows = await session.execute(text(&apos;SELECT id, name, fullname FROM &quot;user_account&quot;&apos;))
    result = list(rows)[0]
    assert isinstance(result[0], int)
    assert result[1] == &quot;Michael&quot;
    assert result[2] == &quot;Michael Test Jr.&quot;
</code></pre><p>You could run <code>pytest</code> and it works.</p><p>But we&apos;re not done yet. We need to add very useful <code>Stairway test</code> and doing so we will face a new challenges with Alembic.</p><h3 id="stairway-test">Stairway test</h3><p>Simple and efficient method to check that migration does not have typos and rolls back all schema changes. Does not require maintenance - you can add this test to your project once and forget about it.</p><p>In particular, test detects the data types, that were previously created by <code>upgrade()</code> method and were not removed by <code>downgrade()</code>: when creating a table/column, Alembic automatically creates custom data types specified in columns (e.g. enum), but does not delete them when deleting table or column - developer has to do it manually.</p><h4 id="how-it-works">How it works</h4><p>Test retrieves all migrations list, and for each migration executes <code>upgrade</code>, <code>downgrade</code>, <code>upgrade</code> Alembic commands.</p><figure class="kg-card kg-image-card"><img src="https://thedmitry.pw/content/images/2023/08/stairway-1.gif" class="kg-image" alt loading="lazy" width="1560" height="541" srcset="https://thedmitry.pw/content/images/size/w600/2023/08/stairway-1.gif 600w, https://thedmitry.pw/content/images/size/w1000/2023/08/stairway-1.gif 1000w, https://thedmitry.pw/content/images/2023/08/stairway-1.gif 1560w" sizes="(min-width: 720px) 720px"></figure><p>Let&apos;s add new test package <code>migrations</code> and create fixtures</p><pre><code class="language-python"># tests/migrations/conftest.py

import pytest
from sqlalchemy.ext.asyncio import create_async_engine

from tests.db_utils import alembic_config_from_url, tmp_database


@pytest.fixture()
async def postgres(pg_url):
    &quot;&quot;&quot;
    Creates empty temporary database.
    &quot;&quot;&quot;
    async with tmp_database(pg_url, &quot;pytest&quot;) as tmp_url:
        yield tmp_url


@pytest.fixture()
async def postgres_engine(postgres):
    &quot;&quot;&quot;
    SQLAlchemy engine, bound to temporary database.
    &quot;&quot;&quot;
    engine = create_async_engine(
        url=postgres,
        pool_pre_ping=True,
    )
    try:
        yield engine
    finally:
        await engine.dispose()


@pytest.fixture()
def alembic_config(postgres):
    &quot;&quot;&quot;
    Alembic configuration object, bound to temporary database.
    &quot;&quot;&quot;
    return alembic_config_from_url(postgres)</code></pre><p>And the test itself:</p><pre><code class="language-python"># tests/migrations/test_stairway.py

&quot;&quot;&quot;
Test can find forgotten downgrade methods, undeleted data types in downgrade
methods, typos and many other errors.

Does not require any maintenance - you just add it once to check 80% of typos
and mistakes in migrations forever.
&quot;&quot;&quot;
import pytest

from alembic.command import downgrade, upgrade
from alembic.config import Config
from alembic.script import Script, ScriptDirectory
from tests.db_utils import alembic_config_from_url


def get_revisions():
    # Create Alembic configuration object
    # (we don&apos;t need database for getting revisions list)
    config = alembic_config_from_url()

    # Get directory object with Alembic migrations
    revisions_dir = ScriptDirectory.from_config(config)

    # Get &amp; sort migrations, from first to last
    revisions = list(revisions_dir.walk_revisions(&quot;base&quot;, &quot;heads&quot;))
    revisions.reverse()
    return revisions


@pytest.mark.parametrize(&quot;revision&quot;, get_revisions())
def test_migrations_stairway(alembic_config: Config, revision: Script):
    upgrade(alembic_config, revision.revision)

    # We need -1 for downgrading first migration (its down_revision is None)
    downgrade(alembic_config, revision.down_revision or &quot;-1&quot;)
    upgrade(alembic_config, revision.revision)
</code></pre><p>Running pytest again we will get an error:</p><p><code>E           RuntimeError: asyncio.run() cannot be called from a running event loop</code></p><p>That&apos;s because inside <code>upgrade</code> command alembic use <code>asyncio.run</code> to run migrations via <code>asyncpg</code> driver. That works just fine then we run migration commands from command line, but during test run an active asyncio event loop is already in place and we can&apos;t use <code>asyncio.run</code> </p><p>We are definitely don&apos;t want to rewrite alembic internals. But we need some way to run an async function from sync <code>run_migrations_online</code> while eventloop is already running.</p><div class="kg-card kg-callout-card kg-callout-card-red"><div class="kg-callout-emoji">&#x1F4A1;</div><div class="kg-callout-text">There is better way to fix it. Solution below is kept for historical purposes. Better solution is added below (I mark it with the same callout)</div></div><p>I decided do the following:</p><ul><li>check for running eventloop, it there is none, we can run standard alembic&apos;s way</li><li>if there is an eventloop we can use <code>asyncio.create_task</code> to wrap our migration command.</li><li>the problem is that we need somehow to <code>await</code> this task inside our pytest fixture, while creating it during alembic <code>upgrade</code> command.</li><li>to solve this problem I decided to add a new variable to <code>conftest.py</code> and set it from alembic. Yeap, it&apos;s kind of a global variable, but I fail to find more elegant solution.</li></ul><pre><code class="language-python"># tests/conftest.py
#... add to the end
MIGRATION_TASK: Optional[Task] = None

@pytest.fixture(scope=&quot;session&quot;)
async def migrated_postgres_template(pg_url):
    &quot;&quot;&quot;
    Creates temporary database and applies migrations.

    Has &quot;session&quot; scope, so is called only once per tests run.
    &quot;&quot;&quot;
    async with tmp_database(pg_url, &quot;pytest&quot;) as tmp_url:
        alembic_config = alembic_config_from_url(tmp_url)
        upgrade(alembic_config, &quot;head&quot;)

        await MIGRATION_TASK # added line

        yield tmp_url</code></pre><pre><code class="language-python"># alembic/env.py

def run_migrations_online() -&gt; None:
    &quot;&quot;&quot;Run migrations in &apos;online&apos; mode.&quot;&quot;&quot;
    try:
        current_loop = asyncio.get_running_loop()
    except RuntimeError:
        # there is no loop, can use asyncio.run
        asyncio.run(run_async_migrations())
        return

    from tests import conftest
    conftest.MIGRATION_TASK = asyncio.create_task(run_async_migrations())</code></pre><p>Everythig should be fine, right?! Right?! </p><p>Well, not quite. We&apos;ve got ourselves a new error:</p><pre><code class="language-python">E       AttributeError: &apos;NoneType&apos; object has no attribute &apos;configure&apos;

../alembic/env.py:61: AttributeError</code></pre><p>What is going on? After reading sources of alembic it&apos;s get clear:</p><ol><li>When we run <code>upgrade</code> command, alembic loads some data into <code>context</code> object using special context-manager:</li></ol><pre><code class="language-python">with EnvironmentContext(...):
    script.run_env()
</code></pre><ol start="2"><li><code>run_env</code> method loads our <code>alembic/env.py</code> and invokes <code>run_migrations_online()</code></li><li>We create asyncio Task and return from <code>run_migrations_online</code>. It ends the context manager and clears <code>context</code> object. So when we are trying to actually run some code inside this task, we already don&apos;t have some parameters. <code>context</code> is None and that&apos;s why we got an error shown before.</li></ol><p>So we need somehow to pass data into our async task. To do that I decided to use <code>contextvars</code>. If we create contexvars before creating asyncio.Task then this task will get a copy of contextvars and will be able to use them.</p><p>Let&apos;s start with import and process of setting the context variable.</p><pre><code class="language-python"># alembic/env.py
from contextvars import ContextVar

ctx_var: ContextVar[dict[str, Any]] = ContextVar(&quot;ctx_var&quot;)


def run_migrations_online() -&gt; None:
    &quot;&quot;&quot;Run migrations in &apos;online&apos; mode.&quot;&quot;&quot;

    try:
        current_loop = asyncio.get_running_loop()
    except RuntimeError:
        # there is no loop, can use asyncio.run
        asyncio.run(run_async_migrations())
        return
    from tests import conftest
    ctx_var.set({
        &quot;config&quot;: context.config,
        &quot;script&quot;: context.script,
        &quot;opts&quot;: context._proxy.context_opts,  # type: ignore
    })
    conftest.MIGRATION_TASK = asyncio.create_task(run_async_migrations())</code></pre><p>Next step - using this contextvar</p><pre><code class="language-python"># alembic/env.py

def do_run_migrations(connection: Connection) -&gt; None:
    try:
        context.configure(connection=connection, target_metadata=target_metadata)

        with context.begin_transaction():
            context.run_migrations()
    except AttributeError:
        context_data = ctx_var.get()
        with EnvironmentContext(
                config=context_data[&quot;config&quot;],
                script=context_data[&quot;script&quot;],
                **context_data[&quot;opts&quot;],
        ):
            context.configure(connection=connection, target_metadata=target_metadata)
            with context.begin_transaction():
                context.run_migrations()</code></pre><p>That&apos;s it. Now you can run pytest and see that everything is ok.</p><div class="kg-card kg-callout-card kg-callout-card-green"><div class="kg-callout-emoji">&#x1F4A1;</div><div class="kg-callout-text">Better solution for running Alembic&apos;s <code spellcheck="false" style="white-space: pre-wrap;">upgrade</code> command from tests (added on December 2025)</div></div><p>Async alembic will call asyncio.run + set some context using context managers<br>So we just need to run it into different thread with its own eventloop and wait for it to finish.</p><p>So just change <code>conftest.py</code> </p><pre><code class="language-python">from concurrent.futures.thread import ThreadPoolExecutor

@pytest.fixture(scope=&quot;session&quot;)
async def migrated_postgres_template(pg_url):
    &quot;&quot;&quot;
    Creates temporary database and applies migrations.

    Has &quot;session&quot; scope, so is called only once per tests run.
    &quot;&quot;&quot;
    async with tmp_database(pg_url, &quot;pytest&quot;) as tmp_url:
        alembic_config = alembic_config_from_url(tmp_url)

        # here!
        #
        with ThreadPoolExecutor() as thread_pool:
            thread_pool.submit(upgrade, alembic_config, &apos;head&apos;).result()

        yield tmp_url</code></pre><p>And there is no need to change <code>env.py</code> at all!</p><div class="kg-card kg-callout-card kg-callout-card-green"><div class="kg-callout-emoji">&#x1F4A1;</div><div class="kg-callout-text">End of added content on December 2025</div></div><p></p><p>This setup allow you to use <code>pytest-xdist</code>. This package splits your test suite into chunks and runs each chunk in different process. It could speed up you tests if you have a lot of them. And because each process will create its own unique test database it works without any problems.</p><p>Finally, just some clumsy integration test to show than our API is working</p><pre><code class="language-python"># tests/test_api.py

from fastapi import status


async def test_my_api(client, app):
    # Test to show that api is working
    response = await client.get(&quot;/api/users/&quot;)
    assert response.status_code == status.HTTP_200_OK
    assert response.json() == {&quot;status&quot;: &quot;ok&quot;, &quot;data&quot;: []}

    response = await client.post(
        &quot;/api/users/&quot;,
        json={&quot;email&quot;: &quot;test@example.com&quot;, &quot;full_name&quot;: &quot;Full Name Test&quot;},
    )
    assert response.status_code == status.HTTP_201_CREATED
    new_user_id = response.json()[&quot;data&quot;][&quot;id&quot;]

    response = await client.get(f&quot;/api/users/{new_user_id}/&quot;)
    assert response.status_code == status.HTTP_200_OK
    assert response.json() == {
        &quot;status&quot;: &quot;ok&quot;,
        &quot;data&quot;: {
            &quot;id&quot;: new_user_id,
            &quot;email&quot;: &quot;test@example.com&quot;,
            &quot;full_name&quot;: &quot;Full Name Test&quot;,
        },
    }

    response = await client.get(&quot;/api/users/&quot;)
    assert response.status_code == status.HTTP_200_OK
    assert len(response.json()[&quot;data&quot;]) == 1</code></pre><pre><code class="language-bash">$ pytest . --vv -x
===================================================================================== test session starts =====================================================================================
platform darwin -- Python 3.9.15, pytest-7.4.0, pluggy-1.2.0 -- /venv-8ZwWMPCX-py3.9/bin/python
cachedir: .pytest_cache
rootdir: /Users/something/blog_article_v2
plugins: anyio-3.7.1
collected 3 items                                                                                                                                                                             

tests/test_api.py::test_my_api PASSED                                                                                                                                                   [ 33%]
tests/test_orm_works.py::test_orm_session PASSED                                                                                                                                        [ 66%]
tests/migrations/test_stairway.py::test_migrations_stairway[revision0] PASSED                                                                                                           [100%]

====================================================================================== 3 passed in 1.48s ======================================================================================</code></pre><p>Hope it is going to be useful to someone and Google will index this article someday. <code><sup>^.^</sup></code> Have a nice day!</p>]]></content:encoded></item><item><title><![CDATA[Multistage docker build]]></title><description><![CDATA[Problems with C extensions and shared libraries. You can't just copy virtual environment. I wrote a python script to search for system dependencies.]]></description><link>https://thedmitry.pw/blog/2021/08/multistage-docker-python-venv/</link><guid isPermaLink="false">61254d8bd799f800014fe419</guid><category><![CDATA[python_tips]]></category><category><![CDATA[docker]]></category><dc:creator><![CDATA[Dmitry Plevkov]]></dc:creator><pubDate>Tue, 24 Aug 2021 21:27:44 GMT</pubDate><content:encoded><![CDATA[<p>Imagine you have a python application which is used <code>psycopg2</code>package.</p><p>To install this package you need to have <code>libpq-dev</code> system library as well as a C compiler installed. (Yes you can install <code>psycopg2-binary</code> without problems, but it doesn&apos;t really matter which library to choose as an example).</p><p> Your Dockerfile might look similar to this. (I use venv to help with multistage build later)</p><figure class="kg-card kg-code-card"><pre><code class="language-yaml">FROM python:3.9.6-slim-buster
# I create venv outside the workdir
# so even if we mount local folder to docker
# it won&apos;t be affected.
RUN python3 -m venv /opt/.venv
# ensure that virtualenv will be active
ENV PATH=&quot;/opt/.venv/bin:$PATH&quot;

RUN apt-get update &amp;&amp; \
    apt-get upgrade -y &amp;&amp; \
    apt-get -y install --no-install-recommends libpq-dev build-essential &amp;&amp; \
    rm -rf /var/lib/apt/lists/*

# Install dependencies:
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY main.py .
CMD [&quot;python&quot;, &quot;main.py&quot;]</code></pre><figcaption>Dockerfile</figcaption></figure><figure class="kg-card kg-code-card"><pre><code class="language-python">import psycopg2

# Connect to your postgres DB
conn = psycopg2.connect(dbname=&quot;test&quot;,
                        user=&quot;postgres&quot;,
                        password=&quot;secret&quot;,
                        host=&quot;db&quot;)

# Open a cursor to perform database operations
cur = conn.cursor()
cur.execute(&quot;SELECT now();&quot;)</code></pre><figcaption>main.py</figcaption></figure><figure class="kg-card kg-code-card"><pre><code class="language-txt">psycopg2==2.9.1</code></pre><figcaption>requirements.txt</figcaption></figure><p>If you run <code>docker build -t multistage .</code> the image will be around 347MB.</p><p>Actually we don&apos;t need `build-essential` for our app, but we have to keep it because of docker layered filesystem. Maybe multistage approach will help? It definitely &#xA0;will! Let&apos;s take a look</p><figure class="kg-card kg-code-card"><pre><code class="language-yaml">FROM python:3.9.6-slim-buster AS build-base
RUN python3 -m venv /opt/.venv
# ensure that virtualenv will be active
ENV PATH=&quot;/opt/.venv/bin:$PATH&quot;

RUN apt-get update &amp;&amp; \
    apt-get upgrade -y &amp;&amp; \
    apt-get -y install --no-install-recommends libpq-dev build-essential &amp;&amp; \
    apt-get clean &amp;&amp; \
    rm -rf /var/lib/apt/lists/*

# Install dependencies:
COPY requirements.txt .
RUN pip install -r requirements.txt

FROM python:3.9.6-slim-buster AS release
WORKDIR /code/
ENV PATH=&quot;/opt/.venv/bin:$PATH&quot;
# Copy only virtualenv with all packages
COPY --from=build-base /opt/.venv /opt/.venv
# Run the application:
COPY main.py .
CMD [&quot;python&quot;, &quot;main.py&quot;]</code></pre><figcaption>Dockerfile simple multistage approach</figcaption></figure><p>The new image takes only 128 mb! The only problem - it doesn&apos;t work</p><figure class="kg-card kg-image-card kg-card-hascaption"><img src="https://thedmitry.pw/content/images/2021/08/image.png" class="kg-image" alt loading="lazy" width="800" height="118" srcset="https://thedmitry.pw/content/images/size/w600/2021/08/image.png 600w, https://thedmitry.pw/content/images/2021/08/image.png 800w" sizes="(min-width: 720px) 720px"><figcaption>our code won&apos;t even start</figcaption></figure><p>What is <code>libpq.so.5</code> ? It&apos;s a shared library. It&apos;s a piece of C code from postgresql driver which <code>psycopg2</code> uses under the hood (simular to .dll files in Windows) To get it we need to install <code>libpq5</code> system library via <code>apt-get install</code></p><p>So how one can find such dependencies? The idea is simple. </p><!--kg-card-begin: markdown--><ul>
<li>Scan all files inside our virtualenv and find all <code>.so</code> and executable files. (because its the only files which can relate to other shared libraries)</li>
<li>use <code>ldd</code> command to understand which shared libraries are used by such files</li>
<li>with <code>dpkg -S</code> we can know the name of the system package contains needed shared library. (dpkg is for debian-based images, but other disctos has its own simular commands)</li>
</ul>
<!--kg-card-end: markdown--><p>I created a python script with all this deps</p><figure class="kg-card kg-code-card"><pre><code class="language-python">import stat
import subprocess
import sys
from pathlib import Path
from typing import List, Optional, Generator

EXECUTABLE_PERMISSIONS = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH

def is_executable(filepath: Path) -&gt; bool:
    return bool(filepath.stat().st_mode &amp; EXECUTABLE_PERMISSIONS)

def find_all_executable_or_so_libs(venv_dir: Path) -&gt; List[Path]:
    executable_files = set()
    for f in venv_dir.rglob(&apos;*&apos;):
        if f.is_dir():
            continue
        if f.name.endswith(&apos;.so&apos;) or is_executable(f):
            executable_files.add(f)
    return sorted(list(executable_files))

def extract_lib_paths(dynamic_str: bytes) -&gt; Optional[str]:
    &quot;&quot;&quot;

    &gt;&gt;&gt; extract_lib_paths(b&quot;linux-vdso.so.1 (0x00007ffee695e000)&quot;)

    &gt;&gt;&gt; extract_lib_paths(b&quot;libpthread.so.0 =&gt; /usr/lib/libpthread.so.0 (0x00007f1475154000)&quot;)
    &apos;/usr/lib/libpthread.so.0&apos;
    &gt;&gt;&gt; extract_lib_paths(b&quot;libpthread.so.0 =&gt; libpthread.so.0 (0x00007f1475154000)&quot;)

    &quot;&quot;&quot;
    if b&apos;=&gt;&apos; not in dynamic_str:
        return
    decoded_path = dynamic_str.decode(encoding=&apos;utf-8&apos;).strip()
    dyn_lib_path = decoded_path.split()[2].strip()
    if &apos;/&apos; not in dyn_lib_path:
        return
    return dyn_lib_path


def find_linked_libs(filepaths: List[Path]) -&gt; List[str]:
    result = set()
    for interesting_file in filepaths:
        p = subprocess.Popen([&apos;ldd&apos;, interesting_file.absolute()], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        for ln in p.stdout:
            lib_path = extract_lib_paths(ln)
            if lib_path:
                result.add(lib_path)
    return sorted(list(result))


def who_owns_debian(lib_path: str) -&gt; Generator[str, None, None]:
    p = subprocess.Popen([&apos;dpkg&apos;, &apos;-S&apos;, lib_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    lines = p.stdout.readlines()
    for line in lines:
        line = line.decode(&apos;utf-8&apos;)
        if &apos;/&apos; in line and &apos;:&apos; in line:
            package_name = line.split(&apos;:&apos;)[0]
            yield package_name


def collect_package_names_debian(shared_libs_paths: List[str]):
    total_names = set()
    for one_lib_path in shared_libs_paths:
        for pkg_name in who_owns_debian(one_lib_path):
            total_names.add(pkg_name) if pkg_name else None
    return sorted(list(total_names))

def main(source_dir):
    source_dir = Path(source_dir)
    interesting_files = find_all_executable_or_so_libs(source_dir)
    shared_libs = find_linked_libs(interesting_files)
    all_names = collect_package_names_debian(shared_libs)
    for name in all_names:
        print(name)


if __name__ == &apos;__main__&apos;:
    main(sys.argv[1])
</code></pre><figcaption>find_deps.py</figcaption></figure><p>For other distros you only need to change <code>who_owns</code> function to appropriate command and result parsing</p><p><code>python find_deps.py /opt/.venv</code> - <strong>be sure not to remove any build-dependencies before that because it will affect result.</strong> </p><figure class="kg-card kg-code-card"><pre><code class="language-yaml">COPY requirements.txt .
RUN pip install -r requirements.txt
COPY find_deps.py find_deps.py
RUN python find_deps.py $VIRTUAL_ENV</code></pre><figcaption>new steps in build-base</figcaption></figure><figure class="kg-card kg-image-card"><img src="https://thedmitry.pw/content/images/2021/08/image-3.png" class="kg-image" alt loading="lazy" width="800" height="369" srcset="https://thedmitry.pw/content/images/size/w600/2021/08/image-3.png 600w, https://thedmitry.pw/content/images/2021/08/image-3.png 800w" sizes="(min-width: 720px) 720px"></figure><p>You can save output as a file </p><p><code>RUN python find_deps.py $VIRTUAL_ENV &gt; sys_deps.txt</code></p><p>And during next stage use automatically install them like so:</p><pre><code>COPY --from=build-base /sys_deps.txt /sys_deps.txt
RUN cat /sys_deps.txt | xargs apt-get install -y</code></pre><p>But I prefer to place it by hand. Final multistage dockerfile is bellow</p><figure class="kg-card kg-code-card"><pre><code class="language-yaml">FROM python:3.9.6-slim-buster AS build-base
ENV VIRTUAL_ENV=/opt/.venv
RUN python3 -m venv $VIRTUAL_ENV
# ensure that virtualenv will be active
ENV PATH=&quot;$VIRTUAL_ENV/bin:$PATH&quot;

RUN apt-get update &amp;&amp; \
    apt-get upgrade -y &amp;&amp; \
    apt-get -y install --no-install-recommends libpq-dev build-essential &amp;&amp; \
    rm -rf /var/lib/apt/lists/*

# Install dependencies:
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY find_deps.py find_deps.py
RUN python find_deps.py $VIRTUAL_ENV

FROM python:3.9.6-slim-buster AS release
WORKDIR /code/
ENV PATH=&quot;/opt/.venv/bin:$PATH&quot;
# install system dependencies
RUN apt-get update &amp;&amp; \
    apt-get upgrade -y &amp;&amp; \
    apt-get -y install --no-install-recommends libpq5 &amp;&amp; \
    rm -rf /var/lib/apt/lists/*

COPY --from=build-base /opt/.venv /opt/.venv
# Run the application:
COPY main.py .
CMD [&quot;python&quot;, &quot;main.py&quot;]</code></pre><figcaption>Dockerfile final</figcaption></figure><!--kg-card-begin: markdown--><table>
<thead>
<tr>
<th>Step</th>
<th>Image size</th>
<th>Working app?</th>
</tr>
</thead>
<tbody>
<tr>
<td>Full build</td>
<td>347 MB</td>
<td>YES</td>
</tr>
<tr>
<td>Multistage (only venv)</td>
<td>128 MB</td>
<td>NO</td>
</tr>
<tr>
<td>Multistage (venv + system deps)</td>
<td>138 MB</td>
<td>YES</td>
</tr>
</tbody>
</table>
<!--kg-card-end: markdown--><p>I think it is worth the trouble. </p>]]></content:encoded></item><item><title><![CDATA[openpyxl can't open xlsx file]]></title><description><![CDATA[Exception: 
Value must be one of {'greaterThanOrEqual', 'notEqual', 'greaterThan', 'lessThan', 'equal', 'lessThanOrEqual'}]]></description><link>https://thedmitry.pw/blog/2021/04/openpyxl/</link><guid isPermaLink="false">607e775e12cea900015513e0</guid><dc:creator><![CDATA[Dmitry Plevkov]]></dc:creator><pubDate>Tue, 20 Apr 2021 07:28:20 GMT</pubDate><content:encoded><![CDATA[<p>Yesterday I&apos;ve got a very cryptic error. </p><pre><code class="language-python">&gt;&gt;&gt; from openpyxl import load_workbook
&gt;&gt;&gt; wb = load_workbook(&apos;super-important.xlsx&apos;)

  File &quot;.../python3.9/site-packages/openpyxl/descriptors/base.py&quot;, line 128, in __set__
    raise ValueError(self.__doc__)
ValueError: Value must be one of {&apos;lessThanOrEqual&apos;, &apos;lessThan&apos;, &apos;equal&apos;, &apos;greaterThan&apos;, &apos;notEqual&apos;, &apos;greaterThanOrEqual&apos;}</code></pre><p>After I placed breakpoint inside <code>openpyxl/descriptors/base.py</code> and run import with debugger I realized that it was complaining about some element of type <code>openpyxl.worksheet.filters.CustomFilter</code> which had value = &apos;**none**&apos;</p><p><a href="http://www.datypic.com/sc/ooxml/e-ssml_customFilter-1.html?ref=thedmitry.pw">OOXML specification</a> says that <code>customFilter</code> criteria has Filter Comparison Operator and it should be one of</p><!--kg-card-begin: html--><table class="en"><tbody><tr><th>Valid value</th><th>Description</th></tr><tr><td>equal</td><td>Equal</td></tr><tr><td>lessThan</td><td>Less Than</td></tr><tr><td>lessThanOrEqual</td><td>Less Than Or Equal</td></tr><tr><td>notEqual</td><td>Not Equal</td></tr><tr><td>greaterThanOrEqual</td><td>Greater Than Or Equal</td></tr><tr><td>greaterThan</td><td>Greater Than</td></tr></tbody></table><!--kg-card-end: html--><p>It seems that my file just does not comply with the OOXML specification.</p><p>So the value `**none**` might be supported internally by Excel but it isn&apos;t the specification. Supporting the specification is the only sane way to develop the library so I can&apos;t blame openpyxl in any way.</p><figure class="kg-card kg-code-card"><pre><code class="language-xml">&lt;filterColumn colId=&quot;2&quot;&gt;&lt;customFilters and=&quot;true&quot;&gt;&lt;customFilter operator=&quot;**none**&quot; val=&quot;&quot;/&gt;&lt;/customFilters&gt;&lt;/filterColumn&gt;</code></pre><figcaption>Here is what I found inside my xlsx file in sheet1.xml</figcaption></figure><p>I&apos;ve managed to work around this by perform <code>Format -&gt; Clear direct formatting (Ctrl+M)</code> in LibreOffice.</p>]]></content:encoded></item><item><title><![CDATA[Creating abstract method]]></title><description><![CDATA[How to create abstract method using NotImplementedError vs @abstractmethod]]></description><link>https://thedmitry.pw/blog/2021/04/creating-abstract-method/</link><guid isPermaLink="false">607dc65112cea9000155137f</guid><category><![CDATA[python_tips]]></category><dc:creator><![CDATA[Dmitry Plevkov]]></dc:creator><pubDate>Mon, 19 Apr 2021 18:56:00 GMT</pubDate><content:encoded><![CDATA[<p>Very popular method to declare an abstract method in Python class is to use <strong>NotImplentedError </strong>exception:</p><pre><code class="language-python">class SomeWorker:
    def do_work(self):
        raise NotImplementedError</code></pre><p>This method even has IDE support (I use PyCharm, so at least it supports it).</p><figure class="kg-card kg-image-card kg-card-hascaption"><img src="https://thedmitry.pw/content/images/2021/04/------------1.png" class="kg-image" alt loading="lazy" width="599" height="244"><figcaption>PyCharm shows an error then I try to inherit without implementing all abstract methods</figcaption></figure><p>The only downside of this approach is that you get the error only after method call.</p><pre><code class="language-python">&gt;&gt;&gt; w = MyWorker()
# it&apos;s ok
&gt;&gt;&gt; w.do_work()

NotImplementedError</code></pre><p> It would be much better to know about the problem right after class instantiation.</p><p>Python&apos;s abstract base classes are here to help</p><pre><code class="language-python">from abc import ABCMeta, abstractmethod

class SomeWorker(metaclass=ABCMeta):
    @abstractmethod
    def do_work(self):
        pass</code></pre><p>For now if you subclass SomeWorker and doesn&apos;t override <code>do_work</code> - you will get an error right upon class instantiation</p><pre><code class="language-python">&gt;&gt;&gt; w = MyWorker()
TypeError: Can&apos;t instantiate abstract class MyWorker with abstract methods do_work</code></pre>]]></content:encoded></item><item><title><![CDATA[Ignoring Exceptions]]></title><description><![CDATA[Sometimes you want to ignore some exceptions.]]></description><link>https://thedmitry.pw/blog/2021/03/ignoring-exceptions/</link><guid isPermaLink="false">60435342624b640001f34fab</guid><category><![CDATA[python_tips]]></category><dc:creator><![CDATA[Dmitry Plevkov]]></dc:creator><pubDate>Sat, 06 Mar 2021 10:19:08 GMT</pubDate><content:encoded><![CDATA[<p>Sometimes you don&apos;t care if some operation fails.</p><p>To ignore some exception, you usually do something like this:</p><pre><code class="language-python">some_list = [0, 1, 2, 3, 4]

try:
    print(some_list[42])
except IndexError:
   pass</code></pre><p>That will work (without printing anything), but there is another way to do the same more expressively and explicitly:</p><pre><code class="language-python">from contextlib import suppress

some_list = [0, 1, 2, 3, 4]
with suppress(IndexError):
    print(some_list[42])</code></pre>]]></content:encoded></item><item><title><![CDATA[Cyberpunk 2077 & gpuapidx12error.cpp]]></title><description><![CDATA[Every time I tried to run this game I got an error "Gpu Crash for unknown reasons". The reason - Windows LTSB ]]></description><link>https://thedmitry.pw/blog/2021/01/cyberpunk-2077-gpuapidx12error/</link><guid isPermaLink="false">5ff6fc3a624b640001f34f4f</guid><category><![CDATA[games]]></category><dc:creator><![CDATA[Dmitry Plevkov]]></dc:creator><pubDate>Thu, 07 Jan 2021 14:01:51 GMT</pubDate><content:encoded><![CDATA[<p>I was frustrated. </p><p>I have Intel i7-2600k &amp; KFA2 GTX 1070 video card but I can&apos;t play Cyberpunk. After hours of googling I have already tried few different versions of video drivers and installed all available stuff from Windows Update. But results were the same.</p><p>I tried change language of the game - no results. It just didn&apos;t run at all!</p><p><code>gpuapidx12error.cpp(40)</code> </p><p>After spending more time on the internet I got the answer. I had wrong Windows 10 version. I used <code>Windows 10 LTSB Version 1607</code>. I love LTSB version - it&apos;s more configurable, I was able to turn off some of telemetry, etc... And I didn&apos;t have problems with it for many years.</p><p>After upgrading to <code>Windows 10 LTSC 1809</code> &#xA0;I&apos;m finally able to visit the Night City. Yay!!</p><p>P.S. I just downloaded <code>iso</code> with LTSC version, unpacked it and run setup.exe from my previous system. Then I chose <code>update</code> version and after 20-30 minutes and few reboots my system was updated and all my programs were in place.</p>]]></content:encoded></item><item><title><![CDATA[Mock's return_value & side effect]]></title><description><![CDATA[How side_effect can change the behavior of you mocked method.]]></description><link>https://thedmitry.pw/blog/2020/12/mocks-side-effect/</link><guid isPermaLink="false">5fd1246b2dbff90001efcd75</guid><category><![CDATA[python_tips]]></category><category><![CDATA[mock]]></category><category><![CDATA[testing]]></category><category><![CDATA[pytest]]></category><dc:creator><![CDATA[Dmitry Plevkov]]></dc:creator><pubDate>Wed, 09 Dec 2020 20:14:46 GMT</pubDate><content:encoded><![CDATA[<p>Mock is an object that simulates the behavior of other object. In python we have a builtin mock module in <code>unittest</code> library.</p><pre><code class="language-python">from unittest import mock
&gt;&gt;&gt; m = mock.Mock()
# You can try to read some non-existing attribute - it will just return mock.
&gt;&gt;&gt; m.some_attribute
&lt;Mock name=&apos;mock.some_attribute&apos; id=&apos;140126083485648&apos;&gt;

#  You can also try to call a method - it will return another mock too.
&gt;&gt;&gt; m.process_data()
&lt;Mock name=&apos;mock.process_data()&apos; id=&apos;140126083843264&apos;&gt;</code></pre><p>Mock have a <code>return_value</code> attribute to help you simulate specified behavior in tests. It allows to simply return a value.</p><pre><code class="language-python">&gt;&gt;&gt; m = mock.Mock()
&gt;&gt;&gt; m.process_data.return_value = &apos;The answer is 42&apos;
&gt;&gt;&gt; m.process_data()
&apos;The answer is 42&apos;
&gt;&gt;&gt; m.process_data()
&apos;The answer is 42&apos;</code></pre><p>You can assign anything - integers, string, tuples, dicts, classes, class instances, etc...</p><h2 id="side-effect-a-multifunctional-tool">Side effect - a multifunctional tool</h2><p>Mock&apos;s <code>side_effect</code> parameter allows to change the behavior of the mock. It accepts three types of values and changes its behavior accordingly.</p><h3 id="side_effect-exception">side_effect = Exception</h3><p>The mock will rise passed exception</p><pre><code class="language-python">&gt;&gt;&gt; m.simulate_fail.side_effect = ValueError(&apos;Whoops...&apos;)
&gt;&gt;&gt; m.simulate_fail()
Traceback (most recent call last):
[...]
File &quot;.../lib/python3.8/unittest/mock.py&quot;, line 1140, in _execute_mock_call
    raise effect
ValueError: Whoops...</code></pre><h3 id="side_effect-iterable">side_effect = Iterable</h3><p>The mock will yield the values from this iterable on subsequent call</p><pre><code class="language-python">&gt;&gt;&gt; m.my_attribute.side_effect = [5, 10, 42, ValueError(&apos;Something happened&apos;)]
&gt;&gt;&gt; m.my_attribute()
5
&gt;&gt;&gt; m.my_attribute()
10
&gt;&gt;&gt; m.my_attribute()
42
&gt;&gt;&gt; m.my_attribute()
Traceback (most recent call last):
[...]
File &quot;.../lib/python3.8/unittest/mock.py&quot;, line 1140, in _execute_mock_call
    raise effect
ValueError: Something happened</code></pre><h3 id="side_effect-callable">side_effect = callable</h3><p>The callable will be executed on each call with the parameters passed when calling the mocked method. Any callable will do, so it can be a function, or a class </p><pre><code class="language-python"># Class-based example 
class Person:
    def __init__(self, name):
        self.name = name

&gt;&gt;&gt; m.my_attribute.side_effect = Person
&gt;&gt;&gt; friend = m.my_attribute(&apos;Max&apos;)
&gt;&gt;&gt; friend.name
&apos;Max&apos;
&gt;&gt;&gt; repr(friend)
&apos;&lt;__main__.Person object at 0x7f71940ad2e0&gt;&apos;

# Function-based example
def log_calls(*args, **kwargs):
    print(f&apos;Called with {args} and {kwargs}&apos;)

&gt;&gt;&gt; m.another_attribute.side_effect = log_calls
&gt;&gt;&gt; m.another_attribute()
Called with () and {}
&gt;&gt;&gt; m.another_attribute(42, name=&apos;Peter&apos;, mood=&apos;Good&apos;)
Called with (42,) and {&apos;name&apos;: &apos;Peter&apos;, &apos;mood&apos;: &apos;Good&apos;}
</code></pre><h2 id="notes">Notes</h2><p><a href="https://github.com/pytest-dev/pytest-mock/?ref=thedmitry.pw">pytest-mock</a> - is a thin-wrapper around the mock package for easier use with pytest.</p>]]></content:encoded></item><item><title><![CDATA[Django + Dramatiq + APScheduler]]></title><description><![CDATA[My experiments with running background tasks on schedule.]]></description><link>https://thedmitry.pw/blog/2020/12/django-dramatiq-apscheduler/</link><guid isPermaLink="false">5fcf71af2dbff90001efcb61</guid><category><![CDATA[python_tips]]></category><category><![CDATA[django]]></category><category><![CDATA[dramatiq]]></category><category><![CDATA[apscheduler]]></category><category><![CDATA[background tasks]]></category><dc:creator><![CDATA[Dmitry Plevkov]]></dc:creator><pubDate>Tue, 08 Dec 2020 15:08:14 GMT</pubDate><content:encoded><![CDATA[<p>Working on one of my django projects I had to do long-running computations in the background. For that I decided to use <code><a href="https://dramatiq.io/?ref=thedmitry.pw">dramatiq</a></code> - a very nice background task processing library. </p><p>After that I had a new task - we needed to run periodical processing tasks (import some data, calculate some statistics, etc...). So I needed some kind of scheduler to start those tasks on time.</p><blockquote><a href="https://apscheduler.readthedocs.io/?ref=thedmitry.pw">APScheduler</a> is the recommended scheduler to use with Dramatiq &#xA0;(dramatiq documentation)</blockquote><p>Here are some approaches I&apos;ve used and my discoveries.</p><h2 id="preparing-steps">Preparing steps</h2><ul><li>I installed the <code>dramatiq</code>, <code>django-dramatiq</code>, and <code>APScheduler</code> packages from pypi.</li><li>I created new django app via <code>python manage.py startapp task_scheduler</code></li><li>I added my app into <code>INSTALLED_APPS</code>. (NOTE - I use this form instead of just writing <code>task_scheduler</code> to be able to use `AppConfig.ready()` function. You can read about in <a href="https://docs.djangoproject.com/en/3.1/ref/applications/?ref=thedmitry.pw#module-django.apps">django documentation</a>.</li></ul><pre><code class="language-python">INSTALLED_APPS = [
	...
    &apos;django_dramatiq&apos;,
    &apos;task_scheduler.apps.TaskSchedulerConfig&apos;,
]</code></pre><ul><li>I configured <a href="https://github.com/Bogdanp/django_dramatiq?ref=thedmitry.pw">django-dramatiq</a> and started to work on periodical tasks.</li></ul><h2 id="task-example">Task example</h2><p>For this article I will be using very simple task</p><figure class="kg-card kg-code-card"><pre><code class="language-python">import logging
import time

import dramatiq


@dramatiq.actor()
def process_user_stats():
    &quot;&quot;&quot;Very simple task for demonstrating purpose.&quot;&quot;&quot;
    logging.warning(&apos;Start my long-running task&apos;)
    time.sleep(5)
    logging.warning(&apos;Task is ended&apos;)</code></pre><figcaption>tasks.py (djang-dramatiq will auto-discover functions in this file)</figcaption></figure><figure class="kg-card kg-code-card"><pre><code class="language-python">import logging
import os

from .tasks import process_user_stats


def periodically_run_job():
	&quot;&quot;&quot;This task will be run by APScheduler. It can prepare some data and parameters and then enqueue background task.&quot;&quot;&quot;
    logging.warning(&apos;It is time to start the dramatiq task&apos;)
    process_user_stats.send()
</code></pre><figcaption>periodic_tasks.py</figcaption></figure><figure class="kg-card kg-code-card"><pre><code>task_scheduler
&#x251C;&#x2500;&#x2500; __init__.py
&#x251C;&#x2500;&#x2500; admin.py
&#x251C;&#x2500;&#x2500; apps.py
&#x251C;&#x2500;&#x2500; migrations
&#x2502;&#xA0;&#xA0; &#x251C;&#x2500;&#x2500; __init__.py
&#x251C;&#x2500;&#x2500; models.py
&#x251C;&#x2500;&#x2500; periodic_tasks.py
&#x251C;&#x2500;&#x2500; tasks.py
&#x2514;&#x2500;&#x2500; views.py
</code></pre><figcaption>Directory structure of <code>task_scheduler</code> django app.</figcaption></figure><p>If you need my final solution - <strong><a href="#my-final-solution">just click here</a></strong></p><h2 id="simple-and-naive-approach-not-a-good-idea-">Simple and naive approach (not a good idea)</h2><p>At first I&apos;ve decided to use <code>BackgroundScheduler</code> class from apscheduler. This scheduler runs in the background using a separate thread. So it won&apos;t block the whole application. I updated <code>periodic_tasks.py</code> as follows:</p><figure class="kg-card kg-code-card"><pre><code class="language-python">import logging
import os

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from pytz import UTC

from .tasks import process_user_stats


def periodically_run_job():
    logging.warning(&apos;Starting dramatiq task&apos;)
    process_user_stats.send()


def start_scheduler():
    logging.warning(f&apos;Starting background scheduler.&apos;)
    scheduler = BackgroundScheduler(timezone=UTC)
    every_minute = CronTrigger(minute=1, timezone=UTC)
    scheduler.add_job(periodically_run_job, every_minute)
    scheduler.start()
    </code></pre><figcaption>periodic_tasks.py</figcaption></figure><p>And then:</p><figure class="kg-card kg-code-card"><pre><code class="language-python">from django.apps import AppConfig


class TaskSchedulerConfig(AppConfig):
    name = &apos;task_scheduler&apos;

    def ready(self):
        from .periodic_tasks import start_scheduler
        start_scheduler()
    </code></pre><figcaption>apps.py</figcaption></figure><p>But if you will start your django dev server - you will see two <code>Starting background scheduler</code> lines. And background tasks will be executed twice. It&apos;s because <code>manage.py runserver</code> &#xA0;runs django twice in two separate processes (one for serving requests and another to auto-reload), and each process executed our <code>ready()</code> function. </p><p>In production I use <code>Gunicorn</code> so it will fork the main process into additional worker processes and I will have 5 copies (depending on settings) of my BackgroundScheduler, and every task will be enqueued 5 times. Not good at all but this is not all.</p><h3 id="how-can-i-change-gunicorn-settings-to-run-only-one-backgroundscheduler-not-a-good-idea-too-">How can I change Gunicorn settings to run only one BackgroundScheduler? (not a good idea too)</h3><p>Well, you can start <code>gunicorn</code> with <a href="https://docs.gunicorn.org/en/latest/settings.html?ref=thedmitry.pw#preload-app"><code>--preload</code> option </a>. This option means <code>Load application code before the worker processes are forked.</code> So our code will be executed in main process and only after that it will be forked. Why is this will help? Because look at <code>start_scheduler()</code></p><pre><code class="language-python">def start_scheduler():
    # I create scheduler
    scheduler = BackgroundScheduler(timezone=UTC)
    ...
    # I run scheduler - It will create a new thread!
    scheduler.start()</code></pre><p>After running this code the master gunicorn process will load the whole django project in memory, so it will execute `start_scheduler` once and a new thread is spun up in the background, which is responsible for scheduling jobs. After that gunicorn will call system&apos;s <strong>fork </strong>method. <strong>BUT</strong> <strong>forked processes do not inherit the threads of their parent</strong> so each worker doesn&apos;t run the BackgroundScheduler thread.</p><p>Are we good now? Well, kind-of. I have fixed running with <code>gunicorn</code> but completely forgot that I will need to run dramatiq-workers processes to actually run background tasks. And each of these processes will load whole project and run <code>start_scheduler</code> and I will have a bunch of schedulers again.</p><h3 id="custom-dramatiq-middleware">Custom dramatiq middleware</h3><pre><code class="language-python">########################
# In periodic_tasks.py #
########################
# move scheduler to be a global object
_SCHEDULER = BackgroundScheduler(timezone=UTC)


def start_scheduler():
    every_minute = CronTrigger(minute=1, timezone=UTC)
    _SCHEDULER.add_job(periodically_run_job, every_minute)
    _SCHEDULER.start()

#################################
# new file custom_middleware.py #
#################################
class AntiScheduleMiddleware(dramatiq.Middleware):
    def before_worker_boot(self, broker, worker):
        from task_scheduler.periodic_tasks import _SCHEDULER
        _SCHEDULER.shutdown()

######################
# in django settings #
######################
DRAMATIQ_BROKER = {
    &quot;BROKER&quot;: &quot;dramatiq.brokers.redis.RedisBroker&quot;,
    &quot;OPTIONS&quot;: {
        &quot;url&quot;: &quot;redis://localhost:6379&quot;,
    },
    &quot;MIDDLEWARE&quot;: [
        &quot;dramatiq.middleware.Prometheus&quot;,
        &quot;dramatiq.middleware.AgeLimit&quot;,
        &quot;dramatiq.middleware.TimeLimit&quot;,
        &quot;dramatiq.middleware.Callbacks&quot;,
        &quot;dramatiq.middleware.Retries&quot;,
        &quot;django_dramatiq.middleware.DbConnectionsMiddleware&quot;,
        &quot;django_dramatiq.middleware.AdminMiddleware&quot;,
        &quot;task_scheduler.custom_middleware.AntiScheduleMiddleware&quot;
    ]
}</code></pre><p>It will work but it looks fragile and too complicated. We need not to forget about custom <code>gunicorn</code> settings and about custom <code>dramatiq</code> middleware. I don&apos;t like this kind of code at all.</p><h3 id="my-final-solution">My final solution</h3><p>The easiest solution to understand and &#xA0;maintain, in my opinion, is &#xA0;to start a single scheduler in its own dedicated process. For this task we will use the blocking scheduler so only it will be running inside the process.</p><ul><li>remove <code>def ready(self):</code> from <code>TaskSchedulerConfig</code></li><li>remove <code>start_scheduler()</code>from <code>periodic_tasks.py</code></li><li>create <code>run_scheduler</code> command</li></ul><figure class="kg-card kg-code-card"><pre><code class="language-python">from django.core.management.base import BaseCommand, CommandError
from apscheduler.schedulers.background import BlockingScheduler
import pytz

from apscheduler.triggers.cron import CronTrigger

from task_scheduler.periodic_tasks import periodically_run_job


class Command(BaseCommand):
    help = &apos;Run blocking scheduler to create periodical tasks&apos;

    def handle(self, *args, **options):
        self.stdout.write(self.style.NOTICE(&apos;Preparing scheduler&apos;))
        scheduler = BlockingScheduler(timezone=pytz.UTC)
        every_day_at_05_05_utc = CronTrigger(hour=5, minute=5, timezone=pytz.UTC)
        scheduler.add_job(periodically_run_job, every_day_at_05_05_utc)
        # ... add another jobs
        self.stdout.write(self.style.NOTICE(&apos;Start scheduler&apos;))
        scheduler.start()
</code></pre><figcaption>File <code>task_scheduler/management/commands/run_scheduler.py</code></figcaption></figure><p>To run scheduler we can use command <code>python manage.py run_scheduler</code> How and where to do it depends on deploy strategy. </p>]]></content:encoded></item><item><title><![CDATA[Tilde `~` in python]]></title><description><![CDATA[Recently I had looked at some piece of code and  saw strange indexing -  items[~index]. ]]></description><link>https://thedmitry.pw/blog/2020/11/tilde-in-python/</link><guid isPermaLink="false">5f9d1a5a6ce6d400016a7d7b</guid><category><![CDATA[python_tips]]></category><dc:creator><![CDATA[Dmitry Plevkov]]></dc:creator><pubDate>Sun, 01 Nov 2020 11:07:28 GMT</pubDate><content:encoded><![CDATA[<p>At first I was like <code>wtf is that</code>? </p><p>The usage of a <code>tilde</code> symbol I was able to remember were:</p><ul><li>creating negated (<code><strong>NOT</strong></code>) query &#xA0;in django ORM like</li></ul><figure class="kg-card kg-code-card"><pre><code class="language-python">from django.db.models import Q

# selects all poll objects with the publication year NOT EQUAL to 2005
Poll.objects.filter(~Q(pub_date__year=2005))</code></pre><figcaption>creating negated (<code><strong>NOT</strong></code>) query in django ORM</figcaption></figure><ul><li>inverting boolean masks in pandas</li></ul><figure class="kg-card kg-code-card"><pre><code class="language-python">#df = pd.DataFrame(...some data...)
# select all rows in which the column `content_type` contains the text `Specification`
df[df.content_type.str.contains(&apos;Specifications&apos;, case=False)]
# select all rows with no text `Specification` in the column named content_type
df[~df.content_type.str.contains(&apos;Specifications&apos;, case=False)]</code></pre><figcaption>inverting booleans masks in pandas library</figcaption></figure><p>In python <code>~</code> operator means <code>bitwise NOT</code>. It takes an integer and switch all bits <code>0</code> to <code>1</code> and <code>1</code> to <code>0</code>. As <a href="https://en.wikipedia.org/wiki/Bitwise_operation?ref=thedmitry.pw#NOT">wikipedia</a> says `<strong>NOT x = -x &#x2212; 1</strong>` (it&apos;s different for unsigned int but it&apos;s not our case). =&gt; &#xA0;<code>~0 = -0 - 1 = -1</code> and <code>~1 = -1 - 1 = -2</code></p><p>So in case of indexing a list the author of <code>items[~index]</code> wanted to take an element from the right side and use zero-based index from the right side too. </p><pre><code class="language-python">items = [&apos;a&apos;, &apos;b&apos;, &apos;c&apos;, &apos;d&apos;, &apos;e&apos;, &apos;f&apos;]
#         0    1    2    3    4    5  # indexes
#        -6   -5   -4   -3   -2   -1  # negative indexes
#        ~5   ~4   ~3   ~2   ~1   ~0  # tilde indexes
items[0] == items[-6] == &apos;a&apos;  # the first element
items[5] == items[-1] == &apos;f&apos;  # the last element
items[~0] == items[-1] == &apos;f&apos;
items[~1] == items[-2] == &apos;e&apos;</code></pre><p>It&apos;s a very strange way to do indexing. Please don&apos;t do that and just use minus indexes. It&apos;s much more common and easy to understand. </p><p>I have also learned that if you want to support <code>~</code> operator for your objects - you can implement magic method <code>__invert__(self)</code>.</p>]]></content:encoded></item><item><title><![CDATA[Rsync vs SCP]]></title><description><![CDATA[A little distinctive feature]]></description><link>https://thedmitry.pw/blog/2020/10/rsync-vs-scp/</link><guid isPermaLink="false">5f86e6236ce6d400016a7c46</guid><category><![CDATA[linux_tips]]></category><dc:creator><![CDATA[Dmitry Plevkov]]></dc:creator><pubDate>Wed, 14 Oct 2020 12:03:03 GMT</pubDate><content:encoded><![CDATA[<p>I use <code>rsync</code> and <code>scp</code> commands to transfer files from one machine to another. </p><p>Recently I discovered &#xA0;a distinctive feature and write it here to not forget.</p><!--kg-card-begin: markdown--><p><code>rsync</code> is atomic. It does a copy into a temporary file and then renames this temp file.</p>
<!--kg-card-end: markdown--><p><code>scp</code> on the other hand creates the filename initially. So in case of network problems your software can be very unhappy about file only half-done.</p>]]></content:encoded></item><item><title><![CDATA[Future Plans]]></title><description><![CDATA[What I want to do here]]></description><link>https://thedmitry.pw/blog/2020/10/future-plans/</link><guid isPermaLink="false">5f8034916ce6d400016a7a18</guid><category><![CDATA[news]]></category><dc:creator><![CDATA[Dmitry Plevkov]]></dc:creator><pubDate>Fri, 09 Oct 2020 10:23:19 GMT</pubDate><content:encoded><![CDATA[<p>So I decided to actually start this blog even though I didn&apos;t prepare all the text and posts. You can polish your text forever so I better start and will be adding new posts on the go.</p><p>Alongside writing tips and notes about python, one my goals is to add comments. I&apos;d love to hear feedback on my posts. </p><p>This CMS has Disqus comments integration but I don&apos;t like this service. It&apos;s slow, adds a lot of junk scripts on your page, all your data is saved god knows where. It&apos;s just a piece of shit. I have found a much better option but I need some free time to run and integrate it here.</p><p>I also need to add a Privacy policy page because I will need it to activate social login etc. I respect your privacy and what is why I don&apos;t use Google Analytics, Yandex Metrika, etc... But I&apos;m very curious about how many people will visit my site. So I installed <a href="https://matomo.org/?ref=thedmitry.pw">Matomo</a> - an open-source analytics. All the data is saved on my server and not transferred anywhere. I need to write about it and about steps I took to respect my visitors privacy.</p>]]></content:encoded></item><item><title><![CDATA[OrderDict & dict]]></title><description><![CDATA[Do we need OrderedDict in python 3.6+?]]></description><link>https://thedmitry.pw/blog/2020/10/orderdict_and_dict/</link><guid isPermaLink="false">5f7e279d77ac480001724b3a</guid><category><![CDATA[python_tips]]></category><dc:creator><![CDATA[Dmitry Plevkov]]></dc:creator><pubDate>Wed, 07 Oct 2020 21:44:35 GMT</pubDate><content:encoded><![CDATA[<p>If dict remembers the order of elements in Python3.6+, why do you need <code>collections.OrderedDict</code> anymore? </p><p><strong>That&apos;s why:</strong></p><pre><code class="language-python">OrderedDict(a=1, b=2) == OrderedDict(b=2, a=1)
False

dict(a=1, b=2) == dict(b=2, a=1)
True</code></pre>]]></content:encoded></item></channel></rss>