網(wǎng)上有很多關(guān)于pos機(jī)系統(tǒng)日切請(qǐng)稍后重試,Java 零入侵記錄系統(tǒng)運(yùn)行日志的知識(shí),也有很多人為大家解答關(guān)于pos機(jī)系統(tǒng)日切請(qǐng)稍后重試的問題,今天pos機(jī)之家(m.afbey.com)為大家整理了關(guān)于這方面的知識(shí),讓我們一起來看下吧!
本文目錄一覽:
1、pos機(jī)系統(tǒng)日切請(qǐng)稍后重試
pos機(jī)系統(tǒng)日切請(qǐng)稍后重試
在做項(xiàng)目時(shí)無意間想不通過切面形式并且支持獨(dú)立部署來實(shí)現(xiàn)系統(tǒng)日志的采集,通過檢索了解了Logstash、Filebeat、Flume、Fluentd、Logagent、rsyslog這些采集器,感覺太復(fù)雜了。通過借鑒開源項(xiàng)目,開發(fā)一個(gè)簡(jiǎn)版的采集器。
源代碼/** * 系統(tǒng)層文件監(jiān)聽 */public static void fileMonitor(){ /** 關(guān)注目錄的事件 */ int mask = JNotify.FILE_CREATED | JNotify.FILE_DELETED | JNotify.FILE_MODIFIED| JNotify.FILE_RENAMED| JNotify.FILE_ANY; /** 是否監(jiān)視子目錄,即級(jí)聯(lián)監(jiān)視 */ boolean watchTree = true; MyJNotifyAdapter jNotifyListener = new MyJNotifyAdapter(); try { int watchID = JNotify.addWatch(monitorDir, mask, watchTree, jNotifyListener); System.out.println("監(jiān)聽程序Id:" + watchID); } catch (JNotifyException e) { e.printStackTrace(); }}
public class MyJNotifyAdapter extends JNotifyAdapter { private static Logger logger = LoggerFactory.getLogger(MyJNotifyAdapter.class); public static final ConcurrentSkipListMap<String,Long> CONTEXT_TIME = new ConcurrentSkipListMap<>(); @Override public void fileCreated(int wd, String rootPath, String name) { logger.info("創(chuàng)建文件 -> 當(dāng)前線程:{},監(jiān)聽程序:{},監(jiān)聽路徑:{},文件名稱:{}",Thread.currentThread().getName(),wd,rootPath,name); } @Override public void fileDeleted(int wd, String rootPath, String name) { logger.info("刪除文件 -> 當(dāng)前線程:{},監(jiān)聽程序:{},監(jiān)聽路徑:{},文件名稱:{}",Thread.currentThread().getName(),wd,rootPath,name); } /** * 注意:JNotify存在一次文件修改,觸發(fā)多次fileModified方法的BUG, * 該方法可以用來修復(fù)一次文件修改可能會(huì)觸發(fā)多個(gè)fileModified方法, * 從而減少?zèng)]有必要的資源重新加載。但是由于t變量是類內(nèi)共享變量,所 * 以注意線程安全,盡量避免共用Listener導(dǎo)致錯(cuò)誤 */ @Override public void fileModified(int wd, String rootPath, String name) { logger.info("修改文件 -> 當(dāng)前線程:{},監(jiān)聽程序:{},監(jiān)聽路徑:{},文件名稱:{}",Thread.currentThread().getName(),wd,rootPath,name); String filePath = rootPath + File.separator + name; File file = new File(filePath); if(!file.isDirectory() && file.exists()){ long lastModified = file.lastModified(); //避免文件修改時(shí)重復(fù)讀取數(shù)據(jù) if(CONTEXT_TIME.get(String.valueOf(lastModified)) != null){ System.out.println("避免文件修改時(shí)重復(fù)讀取數(shù)據(jù)"); }else{ CONTEXT_TIME.put(String.valueOf(lastModified),lastModified); Map<String,Object> hashMap = new HashMap<>(); hashMap.put("wd",wd); hashMap.put("filePath",filePath); hashMap.put("matchStr","start"); MyJNotifyAdapterPublisher.publishEvent(hashMap); } } } @Override public void fileRenamed(int wd, String rootPath, String oldName, String newName) { logger.info("修改文件名稱 -> 當(dāng)前線程:{},監(jiān)聽程序:{},監(jiān)聽路徑:{},舊文件名稱:{},新文件名稱:{}",Thread.currentThread().getName(),wd,rootPath,oldName,newName); } static { GlobalThreadPool.scheduledThreadPool.scheduleWithFixedDelay(() -> { int size = CONTEXT_TIME.size(); if(size > 5){ String lastKey = CONTEXT_TIME.lastKey(); CONTEXT_TIME.keySet().removeIf(key -> !key.equals(lastKey)); logger.info("定時(shí)清理Map中的值:{}",CONTEXT_TIME); } },0, 60 * 60, TimeUnit.MILLISECONDS); }}
public class MyJNotifyAdapterPublisher { public static void publishEvent(Map<String,Object> map){ SpringContextUtils.publishEvent(new MyJNotifyAdapterEvent(map)); }}
/** * 日志采集器 */@Componentpublic class MyJNotifyAdapterListener { private static Logger log = LoggerFactory.getLogger(MyJNotifyAdapterListener.class); /** * @param event */ @Async(value = "taskExecutor") @EventListener(MyJNotifyAdapterEvent.class) public void fileModified(MyJNotifyAdapterEvent event){ Map<String, Object> map = (Map) event.getSource(); log.info("文件修改參數(shù):{}",map); //1.讀取文件 LogFileHandler.handler(map); }}
public class LogFileHandler { private static Logger log = LoggerFactory.getLogger(LogFileHandler.class); /** * 開始日志處理 * @param map */ public static void handler(Map<String,Object> map){ MyAbstractFilter myAbstractFilter = new MyAbstractFilter(); myAbstractFilter.addFilter(new ReadFileFilter()) .addFilter(new BranchFileFilter()) .addFilter(new AnalysisFileFilter()) .addFilter(PolymerizeFilter.getInstance()); myAbstractFilter.doFilter(map,myAbstractFilter); }}
/** * 讀取日志文件 */public class ReadFileFilter implements MyBaseFilter { private static final Logger log = LoggerFactory.getLogger(ReadFileFilter.class); public static final Map<String,Long> CONTEXT_HOLDER = new ConcurrentSkipListMap<>(); @Override public void doFilter(Map<String, Object> map, MyBaseFilter myBaseFilter) { String str = readFile(map); if(StrUtil.isBlank(str)){ return; } map.put("input",str); myBaseFilter.doFilter(map,myBaseFilter); } /** * 讀取指定大小的文件內(nèi)容 * @param map -> filePath 文件路徑 * -> size 讀取指定大小的內(nèi)容 * @return string 讀取的內(nèi)容 */ private String readFile(Map<String, Object> map){ StringBuilder stringBuilder = new StringBuilder(); String filePath = String.valueOf(map.get("filePath")); String key = MD5Util.getMD5_32(filePath.replaceAll("/","")); long pos = 0; if(CONTEXT_HOLDER.get(key) != null){ pos = CONTEXT_HOLDER.get(key); if(pos < 0){ pos = 0; } } BufferedRandomAccessFile bufferedRandomAccessFile = null; try { File file = new File(filePath); if(file.isDirectory()){ return null; } bufferedRandomAccessFile = new BufferedRandomAccessFile(filePath,"r"); long length = bufferedRandomAccessFile.length(); //避免文件清空后讀取異常 if(length <= 0){ CONTEXT_HOLDER.put(key,0L); return null; } //每次自讀1024 * 512個(gè)字節(jié) int capacity = 1024 * 512; long readPointer = length - pos; //指定讀取的起始位置 bufferedRandomAccessFile.seek(pos); int capacityLength = -1; if(readPointer <= capacity && readPointer > 0){ capacityLength = (int) readPointer; }else if(readPointer > capacity){ capacityLength = capacity; }else if(readPointer == 0){ capacityLength = (int) length; }else{ capacityLength = (int) - readPointer; } ByteBuffer byteBuffer = ByteBuffer.allocate(capacityLength); int readLength = -1; readLength = bufferedRandomAccessFile.read(byteBuffer.array()); byteBuffer.clear(); byte[] bytes = byteBuffer.array(); String str = new String(bytes, 0, readLength); stringBuilder.append(str); long pointer = bufferedRandomAccessFile.getFilePointer(); CONTEXT_HOLDER.put(key,pointer); //log.info("文件內(nèi)容:{},容量是:{},指針位置:{}", stringBuilder, byteBuffer.capacity() ,pointer); return stringBuilder.toString(); } catch (IOException e) { e.printStackTrace(); return null; } finally { if(bufferedRandomAccessFile != null){ try { bufferedRandomAccessFile.close(); } catch (IOException e) { e.printStackTrace(); } } } }}
/** * 對(duì)讀取到日志數(shù)據(jù)進(jìn)行分行 */public class BranchFileFilter implements MyBaseFilter { @Override public void doFilter(Map<String, Object> map, MyBaseFilter myBaseFilter) { List<String> list = branch(map); if(list.size() == 0){ return; } map.put("list", list); myBaseFilter.doFilter(map,myBaseFilter); } /** * 分行 * 如果指定了行首正則表達(dá)式,則根據(jù)行首正則表達(dá)式對(duì)每次讀取的日志進(jìn)行分行,切分成多條日志; * 如果沒有指定行首正則表達(dá)式,則將一行日志作為一條日志處理 */ private List<String> branch(Map<String,Object> map){ String input = String.valueOf(map.get("input")); String matchStr = String.valueOf(map.get("matchStr")); Map<Integer,Integer> hashMap = handlerPosition(input,matchStr); if(hashMap == null){ return null; } List<String> list = section(input,hashMap); return list; } /** * 計(jì)算出每一對(duì)開始位置與結(jié)束位置 * @param input 數(shù)據(jù)源 * @param matchStr 指定的字符串 * @return Map 開始位置與結(jié)束位置 */ private Map<Integer,Integer> handlerPosition(String input,String matchStr){ List<Integer> list = getListPosition(input,matchStr); if(list == null){ return null; } Map<Integer,Integer> hashMap = new HashMap<>(); int size = list.size(); if(size % 2 == 0){ for(int i = 0; i < size; i++){ if(i + 2 <= size){ hashMap.put(list.get(i),list.get(i+1)); } } } else{ for(int i = 0; i < size; i++){ if(i + 1 < size ){ hashMap.put(list.get(i),list.get(i+1)); } } } Map<Integer,Integer> result = new LinkedHashMap<>(); hashMap.entrySet().stream() .sorted(Map.Entry.comparingByKey()) .forEachOrdered(x -> result.put(x.getKey(), x.getValue())); return result; } /** * 獲取指定的字符串出現(xiàn)的所有的位置 * @param input 數(shù)據(jù)源 * @param matchStr 指定的字符串 * @return */ private List<Integer> getListPosition(String input,String matchStr){ if(StrUtil.isNotBlank(input)){ List<Integer> list = new ArrayList<>(); int position = input.indexOf(matchStr); if(position == -1){ return null; } list.add(position); while (position != -1){ position = input.indexOf(matchStr, position + 1); list.add(position); } return list; } return null; } /** * 根據(jù)位置截取每段日志 * @param input * @param map * @return */ private List<String> section(String input,Map<Integer,Integer> map){ List<String> list = new ArrayList<>(); String section = ""; for(Map.Entry<Integer, Integer> entry : map.entrySet()){ if(entry.getValue() != -1){ section = input.substring(entry.getKey(),entry.getValue()); }else{ section = input.substring(entry.getKey()); } list.add(section); } return list; }}
/** * 對(duì)分行的日志數(shù)據(jù)進(jìn)行解析 * 1、開啟丟棄解析失敗日志,則直接丟棄該日志,并上報(bào)解析失敗的報(bào)錯(cuò)信息。 * 2、關(guān)閉丟棄解析失敗日志,則上傳解析失敗的原始日志,其中Key為raw_log、Value為日志內(nèi)容。 * 3、設(shè)置日志時(shí)間字段 * 3.1、如果未配置時(shí)間字段,則日志時(shí)間為當(dāng)前解析日志的時(shí)間。 * 3.2、如果配置了時(shí)間字段: * 日志中記錄的時(shí)間距離當(dāng)前時(shí)間12小時(shí)以內(nèi),則從解析的日志字段中提取時(shí)間。 * 日志中記錄的時(shí)間距離當(dāng)前時(shí)間12小時(shí)以上,則丟棄該日志并上傳錯(cuò)誤信息。 */public class AnalysisFileFilter implements MyBaseFilter { Pattern pattern = Pattern.compile("(\\\\d{1,4}[-|\\\\/]\\\\d{1,2}[-|\\\\/]\\\\d{1,2} \\\\d{1,2}:\\\\d{1,2}:\\\\d{1,2})", Pattern.CASE_INSENSITIVE|Pattern.MULTILINE); @Override public void doFilter(Map<String, Object> map, MyBaseFilter myBaseFilter) { List<Map<String, Object>> list = analysis(map); if(list.size() == 0){ return; } map.put("list",list); myBaseFilter.doFilter(map,myBaseFilter); } /** * 解析 * 根據(jù)配置的采集模式,對(duì)每條日志內(nèi)容進(jìn)行解析 * 開啟丟棄解析失敗日志,則直接丟棄該日志,并上報(bào)解析失敗的報(bào)錯(cuò)信息 * 關(guān)閉丟棄解析失敗日志,則上傳解析失敗的原始日志,其中Key為raw_log、Value為日志內(nèi)容 */ private List<Map<String, Object>> analysis(Map<String, Object> map){ List<Map<String, Object>> arrayList = new ArrayList<>(); Map<String, Object> hashMap; List<String> list = (List<String>) map.get("list"); Matcher matcher; String time = DateUtil.current(); for(String str : list){ hashMap = new HashMap<>(); matcher = pattern.matcher(str); if(matcher.find() && matcher.groupCount() >= 1){ time = matcher.group(0); } hashMap.put("value", str); hashMap.put("time", time); arrayList.add(hashMap); } return arrayList; }}
/** * 聚合日志 */public class PolymerizeFilter implements MyBaseFilter{ List<Map<String,Object>> list = new ArrayList<>(); private static class SingletonClassInstance{ private static final PolymerizeFilter instance= new PolymerizeFilter(); } private PolymerizeFilter(){} public static PolymerizeFilter getInstance(){ return SingletonClassInstance.instance; } @Override public void doFilter(Map<String, Object> map, MyBaseFilter myBaseFilter) { List<Map<String, Object>> arrayList = (List<Map<String, Object>>) map.get("list"); this.list.addAll(arrayList); polymerize(list); return; } /** * 聚合日志: * 為降低網(wǎng)絡(luò)請(qǐng)求次數(shù),在日志處理、過濾完畢后,會(huì)在系統(tǒng)內(nèi)部緩存一段時(shí)間后進(jìn)行聚合打包,再發(fā)送到日志服務(wù)。 * 緩存數(shù)據(jù)后,如果滿足以下條件之一,則即時(shí)打包日志發(fā)送到日志服務(wù)。 * 日志聚合時(shí)間超過3秒。 * 日志聚合條數(shù)超過4096條。 * 日志聚合總大小超過512KB。 */ public void polymerize(List<Map<String,Object>> list){ if(list.size() >= 4096){ sendLog(list); } if(getByteSize(list) / 1024 >= 512){ sendLog(list); } if(list.size() > 0){ sendLog(list); } } /** * 發(fā)送日志: * 將采集到的日志聚合并發(fā)送到日志服務(wù)。 * 設(shè)置啟動(dòng)參數(shù)配置文件中的max_bytes_per_sec參數(shù)和send_request_concurrency參數(shù)可調(diào)整日志數(shù)據(jù)的發(fā)送速度和最大并發(fā)數(shù) * 如果數(shù)據(jù)發(fā)送失敗,Logtail自動(dòng)根據(jù)錯(cuò)誤信息決定重試或放棄發(fā)送。 */ private void sendLog(List<Map<String,Object>> list){ System.out.println("發(fā)送日志--->list"+list); Map<String,Object> hashMap = new HashMap<>(); hashMap.put("list",list); LogPublisher.publishEvent(hashMap); //this.list.clear(); } /** * 得到數(shù)據(jù)字節(jié)大小 * @param list * @return */ private long getByteSize(List<?> list){ long byteSize; ByteArrayOutputStream baos = null; ObjectOutputStream os = null; try { baos = new ByteArrayOutputStream(); os = new ObjectOutputStream(baos); os.writeObject(list); byteSize = baos.size(); return byteSize; } catch (IOException e) { e.printStackTrace(); } finally { if(os != null){ try { os.close(); } catch (IOException e) { e.printStackTrace(); } } if(baos != null){ try { baos.close(); } catch (IOException e) { e.printStackTrace(); } } } return 0; } public List<Map<String, Object>> getList() { return list; } public void clareList(){ this.list.clear(); }}
/** * 日志推送 */@Componentpublic class LogListener { private static Logger log = LoggerFactory.getLogger(LogListener.class); /** * 需要使用隊(duì)列接收傳入的數(shù)據(jù),日志實(shí)時(shí)產(chǎn)生,處理日志則需要時(shí)間 * @param event */ @Async(value = "taskExecutor") @EventListener(LogEvent.class) public void fileModified(LogEvent event){ Map<String,Object> list = (Map<String,Object>) event.getSource(); log.info("保存采集的日志數(shù)據(jù):{}",list); //保存采集到的日志到數(shù)據(jù)庫(kù) // PolymerizeFilter.getInstance().clareList(); }}
以上就是關(guān)于pos機(jī)系統(tǒng)日切請(qǐng)稍后重試,Java 零入侵記錄系統(tǒng)運(yùn)行日志的知識(shí),后面我們會(huì)繼續(xù)為大家整理關(guān)于pos機(jī)系統(tǒng)日切請(qǐng)稍后重試的知識(shí),希望能夠幫助到大家!
