美文网首页
python & airflow

python & airflow

作者: crosstrack | 来源:发表于2018-05-18 20:16 被阅读442次

    1.mysql连接报错

    Traceback (most recent call last):
      File "/usr/local/lib/python3.4/dist-packages/sqlalchemy/engine/base.py", line 2147, in _wrap_pool_connect
        return fn()
      File "/usr/local/lib/python3.4/dist-packages/sqlalchemy/pool.py", line 387, in connect
        return _ConnectionFairy._checkout(self)
      File "/usr/local/lib/python3.4/dist-packages/sqlalchemy/pool.py", line 766, in _checkout
        fairy = _ConnectionRecord.checkout(pool)
      File "/usr/local/lib/python3.4/dist-packages/sqlalchemy/pool.py", line 516, in checkout
        rec = pool._do_get()
      File "/usr/local/lib/python3.4/dist-packages/sqlalchemy/pool.py", line 1229, in _do_get
        return self._create_connection()
      File "/usr/local/lib/python3.4/dist-packages/sqlalchemy/pool.py", line 333, in _create_connection
        return _ConnectionRecord(self)
      File "/usr/local/lib/python3.4/dist-packages/sqlalchemy/pool.py", line 461, in __init__
        self.__connect(first_connect_check=True)
      File "/usr/local/lib/python3.4/dist-packages/sqlalchemy/pool.py", line 651, in __connect
        connection = pool._invoke_creator(self)
      File "/usr/local/lib/python3.4/dist-packages/sqlalchemy/engine/strategies.py", line 105, in connect
        return dialect.connect(*cargs, **cparams)
      File "/usr/local/lib/python3.4/dist-packages/sqlalchemy/engine/default.py", line 393, in connect
        return self.dbapi.connect(*cargs, **cparams)
      File "/usr/local/lib/python3.4/dist-packages/pymysql/__init__.py", line 90, in Connect
        return Connection(*args, **kwargs)
      File "/usr/local/lib/python3.4/dist-packages/pymysql/connections.py", line 706, in __init__
        self.connect()
      File "/usr/local/lib/python3.4/dist-packages/pymysql/connections.py", line 932, in connect
        self._request_authentication()
      File "/usr/local/lib/python3.4/dist-packages/pymysql/connections.py", line 1152, in _request_authentication
        auth_packet = self._read_packet()
      File "/usr/local/lib/python3.4/dist-packages/pymysql/connections.py", line 987, in _read_packet
        packet_header = self._read_bytes(4)
      File "/usr/local/lib/python3.4/dist-packages/pymysql/connections.py", line 1033, in _read_bytes
        CR.CR_SERVER_LOST, "Lost connection to MySQL server during query")
    pymysql.err.OperationalError: (2013, 'Lost connection to MySQL server during query')
    

    网上的部分建议是检查max_allowed_packet的值,然后改得尽量大一些,我查看当前的值信息如下:

    mysql> show global variables like 'max_allowed_packet';
    +--------------------+-----------+
    | Variable_name      | Value     |
    +--------------------+-----------+
    | max_allowed_packet | 2635456 | 
    +--------------------+-----------+
    1 row in set (0.00 sec)
    

    我这里的值比较小,把它改大了点

    mysql> set global max_allowed_packet = 2*1024*1024
    

    还一种做法是把timeout的值调大

    查看timeout数值
    mysql> show global variables like '%timeout%';
    +----------------------------+-------+
    | Variable_name              | Value |
    +----------------------------+-------+
    | connect_timeout            | 10    | 
    | delayed_insert_timeout     | 300   | 
    | innodb_lock_wait_timeout   | 100   | 
    | innodb_rollback_on_timeout | OFF   | 
    | interactive_timeout        | 28800 | 
    | net_read_timeout           | 30    | 
    | net_write_timeout          | 60    | 
    | slave_net_timeout          | 3600  | 
    | table_lock_wait_timeout    | 200   | 
    | wait_timeout               | 28800 | 
    +----------------------------+-------+
    10 rows in set (0.00 sec)
    

    修改数值

    mysql> set global net_read_timeout = 120; 
    Query OK, 0 rows affected (0.03 sec)
    
    mysql> set global net_write_timeout = 900;
    Query OK, 0 rows affected (0.00 sec)
    
    mysql> show global variables like '%timeout%';
    +----------------------------+-------+
    | Variable_name              | Value |
    +----------------------------+-------+
    | connect_timeout            | 10    | 
    | delayed_insert_timeout     | 300   | 
    | innodb_lock_wait_timeout   | 100   | 
    | innodb_rollback_on_timeout | OFF   | 
    | interactive_timeout        | 28800 | 
    | net_read_timeout           | 120   | 
    | net_write_timeout          | 900   | 
    | slave_net_timeout          | 3600  | 
    | table_lock_wait_timeout    | 200   | 
    | wait_timeout               | 28800 | 
    +----------------------------+-------+
    10 rows in set (0.00 sec)
    

    效果尚未验证

    2.airflow deadlock

    执行 backfill 命令后,运行了很久,最后报错

    Traceback (most recent call last):
     File "/anaconda3/bin/airflow", line 28, in <module>
       args.func(args)
     File "/anaconda3/lib/python3.5/site-packages/airflow/bin/cli.py", line 167, in backfill
       pool=args.pool)
     File "/anaconda3/lib/python3.5/site-packages/airflow/models.py", line 3330, in run
       job.run()
     File "/anaconda3/lib/python3.5/site-packages/airflow/jobs.py", line 200, in run
       self._execute()
     File "/anaconda3/lib/python3.5/site-packages/airflow/jobs.py", line 2021, in _execute
       raise AirflowException(err)
    airflow.exceptions.AirflowException: ---------------------------------------------------
    Here is output about tasks.
    
    BackfillJob is deadlocked. These tasks have succeeded:
    set()
    These tasks have started:
    {}
    These tasks have failed:
    set()
    These tasks are skipped:
    set()
    These tasks are deadlocked:
    

    方案1
    给出的解决方案是

    To resolve this situation you can do one of the following:
    
    1.use airflow clear <<dag_id>> This will resolve the deadlock and allow future runs of the DAG/task
    2.If above does not solve the issue, you would need to use airflow resetdb This would clear the airflow database and hence resolve the issue
    In future,
    
    try and use execution_timeout=timedelta(minutes=2) set some timeout so that you have explicit control on operator
    Also, do provide a on_failure_callback=handle_failure which would cleanly exist the operator on failure
    

    我的感觉是虽然backfill在跑,但要注意scheduler 的retry 和 backfill之前多个执行的冲突。首先要保证这两个只有一个在跑,可以等
    scheduler retry结束,再backfill。或者停掉scheduler,直接backfill

    方案2

    Try after deleting the dags entries from dag_run table and restarting the scheduler after that

    我的方法是
    0.首先停掉scheduler
    1.进入dag runs


    dag-run.png

    2.找到相关dag,打勾


    屏幕快照 2018-05-18 下午6.11.33.png
    3.删掉
    屏幕快照 2018-05-18 下午6.11.44.png
    4.重启scheduler
    发现已经开始running了

    这回比较幸运,已经没跑完的任务终于跑成功了。
    但是对于我的任务序列,需要顺序执行,发现第一个成功之后就不动了。于是停掉scheduler,又重新启动scheduler
    果然会保留上次的运行结果,直接跳过执行过的那个顺序执行了,最后成功了。

    3. Can 't connect to local MySQL server through socket '/tmp/mysql.sock '(2) "

    直接执行mysql 会报错:

    Can 't connect to local MySQL server through socket '/tmp/mysql.sock '(2) ";
    

    应该执行下面的

    # mysql -uroot -h 127.0.0.1 -p 
    

    详情见 文章

    4. 后台运行airflow相关命令

    airflow kerberos -D
    airflow scheduler -D
    airflow webserver -D
    Here's airflow webeserver --help output (from version 1.8):
    
    -D, --daemon Daemonize instead of running in the foreground
    
    https://stackoverflow.com/questions/46476246/issues-running-airflow-scheduler-as-a-daemon-process/46479069#46479069
    

    5. backfill 和scheduler之间的关系

    scheduler 会回溯以前的日期,自动起backfill来跑过去没记录在db的任务,可利用这点来通过删除记录实现backfill
    https://stackoverflow.com/questions/39882204/airflow-backfill-clarification

    When you change the scheduler toggle to "on" for a DAG, the scheduler will trigger a backfill of all dag run instances for which it has no status recorded, starting with the start_date you specify in your "default_args".

    For example: If the start date was "2017-01-21" and you turned on the scheduling toggle at "2017-01-22T00:00:00" and your dag was configured to run hourly, then the scheduler will backfill 24 dag runs and then start running on the scheduled interval.
    This is essentially what is happening in both of your question. In #1, it is filling in the 3 missing runs from the 30 seconds which you turned off the scheduler. In #2, it is filling in all of the DAG runs from start_date until "now".

    There are 2 ways around this:

    1. Set the start_date to a date in the future so that it will only start scheduling dag runs once that date is reached. Note that if you change the start_date of a DAG, you must change the name of the DAG as well due to the way the start date is stored in airflow's DB.
    2. Manually run backfill from the command line with the "-m" flag which tells airflow not to actually run the DAG, rather just mark it as successful in the DB (https://airflow.incubator.apache.org/cli.html).
    e.g. `airflow backfill MY_tutorial -m -s 2016-10-04 -e 2017-01-22T14:28:30`
    

    相关文章

      网友评论

          本文标题:python & airflow

          本文链接:https://www.haomeiwen.com/subject/epqndftx.html