yisa_get_msg_from_kafka_per_pn.py

发布时间 2023-10-06 22:27:29作者: 苦逼yw

 

 

#!/usr/bin/python
#-*- coding: utf-8 -*-
# 抽取kafka数据到redis_mq模块
# 作者:王成
# 日期:2017-04-14
import MySQLdb
import time
import sys
import redis
import requests
import json
import yaml,logging
from logging.handlers import TimedRotatingFileHandler,RotatingFileHandler
import time,timeit,datetime,sys,os,threading
import Queue
from daemon import Daemon
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import pickle
import base64
#import pandas as pd

reload(sys)
sys.setdefaultencoding('utf-8')

class MyDaemon(Daemon):
    def execute_sql(self,sql,action='select'):
        try:
            if self.db is None:
                self.db = MySQLdb.connect(host=self.config['mysql_web']['host'],port=self.config['mysql_web']['port'], user=self.config['mysql_web']['user'], passwd=self.config['mysql_web']['password'], db=self.config['mysql_web']['database'],charset="utf8")
            try:
                self.db.ping()
            except MySQLdb.Error,e:
                self.db = MySQLdb.connect(host=self.config['mysql_web']['host'],port=self.config['mysql_web']['port'], user=self.config['mysql_web']['user'], passwd=self.config['mysql_web']['password'], db=self.config['mysql_web']['database'],charset="utf8")
       
            mysql_web = self.db.cursor(MySQLdb.cursors.DictCursor)
            r = mysql_web.execute(sql)
            if action=='select':
                r = mysql_web.fetchall()
            elif action=='update':
                pass
            elif action=='insert':
                r = self.db.insert_id()
            mysql_web.close()
            return r
        except Exception, e:
            logging.exception('连接数据库时错误: %s', str(e))
            r = None
            if action=='select':
                r = []
            elif action=='update':
                pass
            elif action=='insert':
                r = 0
            return r
   
    def format_msg(self,msg):
        try:
            row = {}
            if len(msg['plateNumber'])<7 or msg['plateNumber']=='未识别' or msg['plateNumber']=='' or msg['plateNumber']=='无车牌':
                row['license_plate'] = '无牌'
            else:
                row['license_plate'] = msg['plateNumber']#.decode("gbk").encode('utf-8')
            row['plate_type_id'] = msg['plateType']
            row['region_id'] = '420100'
            #卡口编号
            row['location_id'] = base64.b64encode(msg['locationId'])
            row['loc_id'] = msg['locationId']
            #设备编号
            row['device_id'] = base64.b64encode(msg['deviceId'])
            row['dev_id'] = msg['deviceId']
            row['lane_id'] = 0
            row['speed'] = 0
            row['direction_id'] = msg['directionId']
            row['capture_time'] = msg['captureTime'].strftime('%Y-%m-%d %H:%M:%S')
            row['image_url'] = msg['imageUrl']
            row['plate_color'] = msg['plateColor']
            
            return row
        except Exception, e:
            logging.exception('格式化信息时错误: %s', str(e))
            return None
            
   
        
    def run(self):      
        config_file = open(os.path.dirname(os.path.abspath(__file__)) + '/config.yaml')
        self.config = yaml.safe_load(config_file)
        config_file.close()
        
        name = 'yisa_get_msg_kafka_loc_2' 
        logging.basicConfig(level=logging.INFO) 
        handler = RotatingFileHandler('/var/log/%s.log' % name, maxBytes=134217728, backupCount=25)
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logging.getLogger('').addHandler(handler)
        #-------------------同步输出到控制台-------------------
        # console = logging.StreamHandler()
        # console.setLevel(logging.INFO)
        # formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        # console.setFormatter(formatter)
        # logging.getLogger().addHandler(console)
        #-------------------------------------------------------   
        logging.warning('启动 [%s]', name)
        concurrency_lock=threading.BoundedSemaphore(value=self.config['yisa_get_msg']['pidfile6'])                  
        self.db = None
        self.err_count = 0#统计设备id错误数
        self.err_dev_time = {}#统计设备时间错误数
        self.crop_config = {
            "420100999999999":{"crop_x":0,"crop_y":0,"crop_w":0,"crop_h":0},
        }
        self.MSG_QUEEN = Queue.Queue(0)
        try:
            r = redis.StrictRedis(unix_socket_path=self.config['redis_mq']['unix_socket_path'], password=self.config['redis_mq']['password'])
            pipe = r.pipeline()
            logging.warning('创建连接Kafka...')
            #kafka_brokers = "68.109.211.33:9092,68.109.211.34:9092,68.109.211.35:9092,68.109.211.36:9092,68.109.211.37:9092,68.109.211.39:9092,68.109.211.40:9092,68.109.211.41:9092,68.109.211.42:9092,68.109.211.43:9092"
            kafka_brokers = "68.109.74.4:21005,68.109.74.5:21005,68.109.74.6:21005,68.109.74.7:21005,68.109.74.8:21005"
            # 实例化消费者
            #consumer = KafkaConsumer('txface',bootstrap_servers=kafka_brokers, auto_offset_reset='earliest')
            consumer = KafkaConsumer(bootstrap_servers=kafka_brokers)
            pn_list = []# 元素是列表,0:卡口id,1:过车总数,2:时间
            pn_list.append(['卡口id','过车总数','时间','卡口名称','type_id'])
            for par_num in range(6):
                partition_1 = TopicPartition('txface',par_num)
                offset_start = 107720000
                consumer.assign([partition_1])
                consumer.seek(partition_1,offset=offset_start)
                recv_number = 0
                start = timeit.default_timer()
                list1 = []
                #of = open('./dec_id3.txt' , 'a')
                while 1:
                    break_time = ''
                    try:
                        num = 0
                        for msg in consumer:
                            recv_number += 1
                            # 消息内容
                            message = msg.value
                            offset = msg.offset # kafka偏移量
                            print(msg.partition)
                            if recv_number%5000==0:
                                logging.warning('offset:%d,recv:%d',offset,recv_number)   
                            #continue
                            row = json.loads(message)
                            logging.info(row)
                       #     if num > 100:
                       #         break
                            num = num + 1
                            tmp_list = ['','0','','','']
                            PASS_TIME = row['PASS_TIME']
                            time_stru = time.strptime(PASS_TIME,"%y%m%d%H%M%S")
                            break_time = time_stru
                            print(PASS_TIME)
                            start_time = time.strptime("220727194000","%y%m%d%H%M%S")
                            end_time = time.strptime("220727194500","%y%m%d%H%M%S")
                            if time_stru > end_time:
                                break
                            if time_stru >= start_time and time_stru <= end_time:
                                tmp_list[0] = str(row['TOLLGATE_ID'])
                                tmp_list[1] = '1'
                                tmp_list[2] = time.strftime('%Y-%m-%d %H:%M:%S',time_stru)
                                tmp_list[3] = row['TOLLGATE_NAME']
                                tmp_list[4] = str(row['type_id'])
                            else:
                                continue
                            if pn_list:
                                flag = 0
                                for i in range(len(pn_list)):
                                    if tmp_list[0] in pn_list[i]:
                                        pn_list[i][1] = int(tmp_list[1]) + int(pn_list[i][1])
                                        pn_list[i][1] = str(pn_list[i][1])
                                        pn_list[i][2] = tmp_list[2]
                                        flag = 1
                                if flag == 0:
                                    pn_list.append(tmp_list)
                            else:
                                pn_list.append(tmp_list)
                            try:
                                if row['ECSB'] == 'VJ2139':
                                    logging.info(row)
                                    continue
                                #else:
                                    #logging.info(row['PASS_TIME']+ "-" +row['PassNo'])
                                    #logging.info('null')
                                    #of.write(row['TOLLGATE_ID']+ '\n')
                                #if len(list1) > 2000:
                                    #logging.info(list1)
                                
                            except Exception, e:
                                logging.info("错误 %s",str(e))

                            #msg = self.format_msg(row)
                            #if 0:
                            #    try:
                                    # print msg
                                    # continue
                            #        pipe.incr(time.strftime("%Y%m%d",time.localtime(time.time()))+":LANGXIN")
                            #        pipe.rpush(self.config['redis_mq']['queue_key_name'], json.dumps(msg))#,ensure_ascii=False
                            #        if pipe.__len__()==1:
                            #            start = timeit.default_timer()
                                    #self.MSG_QUEEN.put(msg)
                            #    except UnicodeDecodeError,ude:
                            #        logging.error('编码json时错误: %s',msg['license_plate'])
                            #if timeit.default_timer()-start>1 or pipe.__len__()>100:#1秒或100条入redis一次
                                #pipe.execute()
                                pass
                    except KeyboardInterrupt:
                        logging.error('Ctrl+C,终止运行')
                        return
                    except Exception, e:
                        logging.exception('读取kafka时错误: %s', str(e))
                        time.sleep(10)
                    if break_time > time.strptime("220727194500","%y%m%d%H%M%S"):
                        break
            #print(pn_list)
            for i in pn_list:
                s = ','.join(i)
                #print(s)
                with open("count_kafka_per_location.csv",'a') as f:
                    f.write(s)
                    f.write('\n')
        except Exception, e:
            logging.exception('取数据时错误: %s', str(e))
            sys.exit(0)
    
if __name__ == "__main__":
    daemon = MyDaemon('/var/run/yisa_get_msg_kafka_test_2.pid')
    #daemon.run()
    #sys.exit(0)
    if len(sys.argv) == 2:
        if 'start' == sys.argv[1]:
            daemon.start()
        elif 'stop' == sys.argv[1]:
            daemon.stop()
        elif 'restart' == sys.argv[1]:
            daemon.restart()
        else:
            print "Unknown command"
            sys.exit(2)
        sys.exit(0)
    else:
        print "usage: %s start|stop|restart" % sys.argv[0]
        sys.exit(2)